In [53]:
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator


from time import strftime,localtime
from web_scraper import *

In [2]:
from os import environ, listdir
from db_utils import make_engine
import pandas as pd
from pyspark.sql import SparkSession
db = 'nba'

engine = make_engine(environ.get('USER'),environ.get('PSWD'),db)

In [3]:
def get_last_year(year):
        if(year > 2000):
            return year % 2000
        return year % 100

In [4]:
def extract(start_date):
        bs = Box_scores()
        year = int(strftime("%Y",localtime()))

        start_reg = 9
        end_reg = 4
        start_post = end_reg
        end_post = 7
        reg_season = True

        month = int(strftime('%m',localtime()))
        year_range = str(year-1) + "-{:0>2d}".format(get_last_year(year))

        if month <= end_reg or month >= start_reg:

                url = bs.build_url(year_range, start_date, reg_season)

        elif month >= start_post and month <= end_post:

                url = bs.build_url(year_range, start_date, not reg_season)
        else:
                return # exit dag

        date = strftime('%Y-%m-%d',localtime())
        count = 0
        for p in bs.iter_all(url):
                df = pd.read_html(p,flavor = 'bs4')[0]

                pids, tids, gids = bs.get_player_and_team_ids(p)
                
                df['pids'] = pids
                df['tids'] = tids
                df['gids'] = gids
                
                fp = '../../../data/'+date+'_page_'+str(count)+'.csv'
                df.to_csv(fp,index=False)


                count += 1

In [5]:
def extract_schema():
    from pyspark.sql.types import StructType
# path = '../../../data/'

    # schema = StructType().add('Player','string').add('Team','string').add('Match Up','string').\
    #     add('Game Date','string').add('Season','string').add('W/L','string').add('MIN','integer').\
    #     add('PTS', 'integer').add('FGM', 'integer').add('FGA', 'integer').add('FG%', 'float').\
    #     add('3PM', 'integer').add('3PA', 'integer').add('3P%', 'float').add('FTM', 'integer').\
    #     add('FTA', 'integer').add('FT%', 'float').add('OREB', 'integer').add('DREB', 'integer').\
    #     add('REB', 'integer').add('AST', 'integer').add('STL', 'integer').add('BLK', 'integer').\
    #     add('TOV', 'integer').add('PF', 'integer').add('+/-', 'integer').add('FP', 'float').\
    #     add('pids', 'integer').add('tids', 'integer').add('gids','string')

    schema = StructType().add('Player','string',False).add('Team','string',False).\
    add('Match Up','string',False).\
    add('Game Date','date',False).add('Season','string').add('W/L','string').add('MIN','integer').\
    add('PTS', 'integer').add('FGM', 'integer').add('FGA', 'integer').add('FG%', 'float').\
    add('3PM', 'integer').add('3PA', 'integer').add('3P%', 'float').add('FTM', 'integer').\
    add('FTA', 'integer').add('FT%', 'float').add('OREB', 'integer').add('DREB', 'integer').\
    add('REB', 'integer').add('AST', 'integer').add('STL', 'integer').add('BLK', 'integer').\
    add('TOV', 'integer').add('PF', 'integer').add('+/-', 'integer').add('FP', 'float').\
    add('pids', 'integer',False).add('tids', 'integer',False).add('gids','string',False)

    return schema

In [6]:
def atom_schema():
    from pyspark.sql.types import StructType

    return StructType().add('id','integer',False).add('name','string',False)

In [7]:
def players(df):
    cols = ['pids','Player']

    rename_cols = ['id','name']

    player_df = df[cols].select('*').distinct()

    player_df = player_df.toDF(*rename_cols)

    player_df.writeStream.outputMode('append').format('csv').\
        option('path','../../../queries/players').option('checkpointLocation','../../../checkpoints').\
        start()

In [8]:
def teams(df):
    cols = ['tids','Team']

    # rename_cols = ['id','name']

    team_df = df[cols].select('*').distinct()

    # team_df = team_df.toDF(*rename_cols)

    team_df.writeStream.outputMode('append').format('csv').\
        option('path','../../../queries/teams').option('checkpointLocation','../../../checkpoints').\
        start()

In [9]:
def box_scores(df):
   cols = ['pids','tids', 'gids', 'MIN', 'PTS', 'FGM', 'FGA', '3PM', '3PA', 'FTM', 'FTA', 'OREB',
      'DREB', 'AST', 'STL', 'BLK', 'TOV', 'PF', '+/-', 'W/L', 'Game Date', 'Match Up']

   box_df = df[cols]

   # rename_cols = ['player_id','team_id', 'game_id', 'mins','pts', 'fgm', 'fga', 'pm3',
   #    'pa3', 'ftm', 'fta', 'oreb','dreb', 'ast', 'stl', 'blk', 'tov', 'pf', 'plus_minus',
   #    'result', 'game_day', 'match_up']

   # box_df = box_df.toDF(*rename_cols)
   box_df.writeStream.outputMode('append').\
      format('csv').option('path','../../../queries/box_scores').\
      option('checkpointLocation','../../../checkpoints').start()

   # box_df.createOrReplaceTempView('box_scores')

In [10]:
def box_score_schema():
    from pyspark.sql.types import StructType

    schema = StructType().add('player_id','integer',False).add('team_id','string',False).\
    add('game_id','string',False).add('mins','integer').\
    add('pts', 'integer').add('fgm', 'integer').add('fga', 'integer').\
    add('pm3', 'integer').add('pa3', 'integer').add('ftm', 'integer').\
    add('fta', 'integer').add('oreb', 'integer').add('dreb', 'integer').\
    add('ast', 'integer').add('stl', 'integer').add('blk', 'integer').\
    add('tov', 'integer').add('pf', 'integer').add('plus_minus', 'integer').\
    add('result','string').add('game_day','date').add('match_up','string')

    return schema

In [11]:
SparkSession.builder.config('spark.driver.extraClassPath',environ.get('CLASSPATH')).getOrCreate()
spark = SparkSession.builder.appName('nba_player_box_score').getOrCreate()

22/06/12 18:06:55 WARN Utils: Your hostname, rpi3 resolves to a loopback address: 127.0.1.1; using 172.25.14.38 instead (on interface wlan0)
22/06/12 18:06:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/12 18:07:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [24]:
def transform(spark):    
    path = '../../../data/'
    # stream data
    df = spark.readStream.option('cleanSource','delete').option('sep',',').csv(path,
    schema=extract_schema(),header=True,dateFormat='MM/dd/yyyy')

    # drop unneeded columns
    df = df.drop('Season','FP','3P%','FG%','FT%','REB')

    box_scores(df)
    players(df)
    teams(df)

    return df

In [13]:
command = 'SELECT MAX(game_day) as GD from box_scores;'
max_date = pd.read_sql(command,engine,parse_dates=['GD'])
max_date = max_date.loc[0,'GD']

extract(max_date)
df = transform(engine,spark)

22/06/12 18:10:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/06/12 18:10:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/06/12 18:10:12 WARN StreamingQueryManager: Stopping existing streaming query [id=57351088-4a62-4603-856f-dd516a3beea1, runId=37c118a7-60de-47b5-a1e6-f8db539d8042], as a new run is being started.
22/06/12 18:10:13 WARN HadoopFSUtils: The directory file:/home/blunt/programming/data_science/data/2022-06-17_page_0.csv was not found. Was it deleted very recently?
22/06/12 18:10:14 WARN Shell: Interrupted while joining on: Thread[Thread-19,5,main]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1300)
	at java.base/java.lang.Thread.join(Thread.java:1375)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1043)
	at org.apac

In [None]:
def add_new(df, table, engine):
    ids = str(tuple(df['id']))
    existing = pd.read_sql(f'select * from {table} where id in {ids};',engine)

    merged = df.merge(existing,on='id',how='left',indicator=True)
    merged = merged[merged['_merge']=='left_only']

    added = merged.to_sql('table',engine,index=False,if_exists='append',method='multi')

    print(f'{added} rows added')

    return

In [None]:
def primary_keys(spark,table,engine):
    path = f'../../../queries/{table}/*.csv'
    df = spark.read.csv(path,
        schema=atom_schema()).toPandas()

    add_new(df,table,engine)

In [None]:
def add_box_scores(spark,engine):

In [None]:
def load(engine, spark):
    path = '../../../queries/'

    with TaskGroup("primary_keys", tooltip="Add new team and player primary keys") as primary_keys:
        p = PythonOperator(task_id = 'Add new players',python_callable=primary_keys,
            op_kwargs={'spark':spark,'engine':engine,'table':'players'})

        t = PythonOperator(task_id = 'Add new teams',python_callable=primary_keys,
            op_kwargs={'spark':spark,'engine':engine,'table':'teams'})

        # players_df = spark.read.csv(path + 'players/*.csv',
        # schema=atom_schema())
    with TaskGroup("stats", tooltip="Add new box score data") as new_box_scores:
        box_df = spark.read.csv(path + 'box_scores/*.csv',
        schema=box_score_schema())

    primary_keys >> new_box_scores



In [52]:
path = '../../../queries/'
box_df = spark.readStream.option('sep',',').csv(path + 'box_scores/',
schema=box_score_schema())

In [22]:
df = spark.read.csv(path + 'box_scores/*.csv',schema=box_score_schema())

In [25]:
t = df.toPandas()

                                                                                

In [None]:
with DAG('player_box_scores_etl',default_args={'retries': 4},description='ETL DAG tutorial',
schedule_interval='0 10 * * *',catchup=False,tags=['nba_stats']) as dag:

    dag.doc_md = __doc__

    SparkSession.builder.config('spark.driver.extraClassPath',environ.get('CLASSPATH')).getOrCreate()
    spark = SparkSession.builder.appName('nba_player_box_score').getOrCreate()

    command = 'SELECT MAX(game_day) as GD from box_scores;'
    max_date = pd.read_sql(command,engine,parse_dates=['GD'])
    max_date = max_date.loc[0,'GD']

    e = PythonOperator(task_id = '',python_callable=extract,
    op_kwargs={'start_date':max_date})

    t = PythonOperator(task_id = '',python_callable=transform,
    op_kwargs={'start_date':max_date})

    l = PythonOperator(task_id = '',python_callable=load,
    op_kwargs={'spark':spark,'engine':engine})


    e >> t >> l

In [None]:
# spark.sql('select * from testo').show()

In [None]:
max_date- datetime.timedelta(days=2)

In [None]:
SparkSession.builder.config('spark.driver.extraClassPath',environ.get('CLASSPATH')).getOrCreate()
spark = SparkSession.builder.appName('nba_player_box_scores').getOrCreate()

In [None]:
from pyspark.sql.types import *
path = '../../../data/'

schema = StructType().add('Player','string',False).add('Team','string',False).add('Match Up','string',False).\
    add('Game Date','date',False).add('Season','string').add('W/L','string').add('MIN','integer').\
    add('PTS', 'integer').add('FGM', 'integer').add('FGA', 'integer').add('FG%', 'float').\
    add('3PM', 'integer').add('3PA', 'integer').add('3P%', 'float').add('FTM', 'integer').\
    add('FTA', 'integer').add('FT%', 'float').add('OREB', 'integer').add('DREB', 'integer').\
    add('REB', 'integer').add('AST', 'integer').add('STL', 'integer').add('BLK', 'integer').\
    add('TOV', 'integer').add('PF', 'integer').add('+/-', 'integer').add('FP', 'float').\
    add('pids', 'integer',False).add('tids', 'integer',False).add('gids','string',False)

# df = spark.readStream.option('cleanSource','delete').text(path,wholetext=True)
df = spark.readStream.option('cleanSource','delete').option('sep',',').csv(path,
schema=schema,header=True,dateFormat='MM/dd/yyyy')

df = df.drop('Season','FP','3P%','FG%','FT%','REB')

In [None]:
command = 'SELECT MAX(game_day) as GD from box_scores;'
max_date = pd.read_sql(command,engine,parse_dates=['GD'])
max_date = max_date.loc[0,'GD']

In [None]:
df.writeStream.queryName('test').outputMode('append').format('memory').start()

In [None]:
cols = ['tids','Team']

teams_df = df[cols]

In [None]:
teams = spark.readStream.option('cleanSource','delete').option('sep',',').csv(path,inferSchema=True,
header=True,dateFormat='MM/dd/yyyy')


In [None]:
path

In [None]:
teams = teams_df.select('*').distinct()

teams.writeStream.queryName('teams').outputMode('append').format('csv').\
        option('path','../../../queries/teams').option('checkpointLocation','../../../checkpoints').start()

In [None]:
df[player_cols].writeStream.queryName('test2').outputMode('update').format('memory').start()

In [None]:
rename_box_scores = ['player_id','team_id', 'game_id', 'mins','pts', 'fgm', 'fga', 'pm3',
'pa3', 'ftm', 'fta', 'oreb','dreb', 'ast', 'stl', 'blk', 'tov', 'pf', 'plus_minus',
'result', 'game_day', 'match_up']

In [None]:
df1 = df[box_scores]

In [None]:
df1 = df1.toDF(*rename_box_scores)

In [None]:
df1.writeStream.queryName('test4').outputMode('append').format('memory').start()

In [None]:
from pyspark.sql.functions import to_date
df = df.withColumn('Game Date',to_date(df['Game Date'], 'MM/dd/yyyy'))
# df1.printSchema()
# df1.select("birth_date").dtypes
