In [2]:
import pyspark
import os
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
credentials_location = '/Users/ceanders/.google/credentials/projects/redskins-rule/redskins-rule-docker-airflow-credentials.json'

In [4]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/usr/local/Cellar/apache-spark/3.5.0/libexec/jars/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

spark = SparkSession.builder \
    .appName('test') \
    .config(conf=sc.getConf()) \
    .getOrCreate()

24/03/04 16:07:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
raw_url_nfl = 'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2000.parquet'

In [6]:
df = spark.read.parquet(raw_url_nfl)
df.printSchema()

                                                                                

root
 |-- timestamp: timestamp_ntz (nullable = true)
 |-- status: string (nullable = true)
 |-- season: struct (nullable = true)
 |    |-- year: long (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- displayName: string (nullable = true)
 |    |-- half: long (nullable = true)
 |-- team: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- abbreviation: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- displayName: string (nullable = true)
 |    |-- clubhouse: string (nullable = true)
 |    |-- color: string (nullable = true)
 |    |-- logo: string (nullable = true)
 |    |-- recordSummary: string (nullable = true)
 |    |-- seasonSummary: string (nullable = true)
 |    |-- standingSummary: string (nullable = true)
 |    |-- groups: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: struct (nullable

In [381]:
df.count()

4

In [8]:
exploded_df = df.withColumn('exp_events', F.explode('events'))

In [9]:
exploded_df.printSchema()

root
 |-- timestamp: timestamp_ntz (nullable = true)
 |-- status: string (nullable = true)
 |-- season: struct (nullable = true)
 |    |-- year: long (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- displayName: string (nullable = true)
 |    |-- half: long (nullable = true)
 |-- team: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- abbreviation: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- displayName: string (nullable = true)
 |    |-- clubhouse: string (nullable = true)
 |    |-- color: string (nullable = true)
 |    |-- logo: string (nullable = true)
 |    |-- recordSummary: string (nullable = true)
 |    |-- seasonSummary: string (nullable = true)
 |    |-- standingSummary: string (nullable = true)
 |    |-- groups: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: struct (nullable

In [20]:
exploded_df = exploded_df.withColumn('exp_competitions', F.explode('exp_events.competitions'))
exploded_df = exploded_df.withColumn('exp_competitors', F.explode('exp_competitions.competitors'))

In [31]:
exploded_df.createOrReplaceTempView('xform_table_test')

In [32]:
xform_df_test = spark.sql("""
    SELECT 
        exp_events.date,
        exp_competitors.id,
        exp_competitors.score.value
    FROM
        xform_table_test
    GROUP BY 
        1,2,3
""")
xform_df_test.show()

+-------------------+---+-----+
|               date| id|value|
+-------------------+---+-----+
|2000-10-01 20:15:00| 28| 20.0|
|2000-09-03 17:00:00| 29| 17.0|
|2000-12-03 18:00:00| 28|  7.0|
|2000-12-24 18:00:00| 28| 20.0|
|2000-12-03 18:00:00| 19|  9.0|
|2000-11-05 21:05:00| 28| 15.0|
|2000-10-22 20:15:00| 30| 16.0|
|2000-10-01 20:15:00| 27| 17.0|
|2000-11-20 17:00:00| 14|  0.0|
|2000-10-15 17:00:00| 33|  3.0|
|2000-12-24 18:00:00| 22|  3.0|
|2000-11-20 17:00:00| 28|  0.0|
|2000-12-16 17:30:00| 23| 24.0|
|2000-09-25 00:20:00| 28| 16.0|
|2000-11-26 18:00:00| 28| 20.0|
|2000-10-30 17:00:00| 10|  0.0|
|2000-09-10 20:15:00| 28| 10.0|
|2000-10-30 17:00:00| 28|  0.0|
|2000-09-03 17:00:00| 28| 20.0|
|2000-09-10 20:15:00|  8| 15.0|
+-------------------+---+-----+
only showing top 20 rows



                                                                                

In [67]:
# Function to read schema from a Parquet file and explode fields
def get_raw_data_and_explode(file_path):
    df = spark.read.parquet(file_path)
    count = df.count()
    print(f"exploding data complete...here's the row count {count}")
    exploded_df = df.withColumn('exp_events', F.explode('events'))
    exploded_df = exploded_df.withColumn('exp_competitions', F.explode('exp_events.competitions'))
    exploded_df = exploded_df.withColumn('exp_competitors', F.explode('exp_competitions.competitors'))
    return exploded_df

# Define empty df to save to
empty_RDD = spark.sparkContext.emptyRDD()
columns = StructType([
    StructField("date", TimestampNTZType(), True),
    StructField("id", StringType(), True),
    StructField("value", DoubleType(), True)
])
processed_df = spark.createDataFrame(data = empty_RDD, schema = columns)

# Process files
file_paths = [
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2000.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2001.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2002.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2003.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2004.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2005.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2006.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2007.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2008.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2009.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2010.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2011.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2012.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2013.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2014.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2015.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2016.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2017.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2018.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2019.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2020.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2021.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2022.parquet',
    'gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2023.parquet',
    
]

for file_path in file_paths:
    print(f"grabbing file...{file_path}")
    exploded_df = get_raw_data_and_explode(file_path)
    
    # create a temp table
    exploded_df.createOrReplaceTempView('temp')

    # transform the file and save to a df
    xform_df = spark.sql("""
    SELECT 
        exp_events.date,
        exp_competitors.id,
        exp_competitors.score.value
    FROM
        temp
    GROUP BY 
        1,2,3
    """)
    print("xform df row count")
    count = xform_df.count()
    print(f"transforming data complete...here's the row count {count}") 
    
    # union to the processed df
    processed_df = processed_df.unionByName(xform_df)
    proc_count = processed_df.count()
    print(f"processsed count...{proc_count}")


grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2000.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32
processsed count...32
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2001.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32
processsed count...64
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2002.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32


                                                                                

processsed count...96
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2003.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32
processsed count...128
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2004.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32


                                                                                

processsed count...160
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2005.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32
processsed count...192
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2006.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32
processsed count...224
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2007.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32
processsed count...256
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2008.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32
processsed count...288
grabbing file...gs://redskins-rule-nfl-game-data/raw/sche

                                                                                

processsed count...576
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2018.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32


                                                                                

processsed count...608
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2019.parquet
exploding data complete...here's the row count 1
xform df row count


                                                                                

transforming data complete...here's the row count 32


                                                                                

processsed count...640
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2020.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 32


                                                                                

processsed count...672
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2021.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 34


                                                                                

processsed count...706
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2022.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 34


                                                                                

processsed count...740
grabbing file...gs://redskins-rule-nfl-game-data/raw/schedule/nfl_season_2023.parquet
exploding data complete...here's the row count 1
xform df row count
transforming data complete...here's the row count 34


[Stage 1511:>                                                       (0 + 1) / 1]

processsed count...774


                                                                                

In [69]:
processed_df.select('*').sort('date', ascending=False).show()

[Stage 1619:>                                                       (0 + 1) / 1]

+-------------------+---+-----+
|               date| id|value|
+-------------------+---+-----+
|2024-01-07 21:25:00| 28| 10.0|
|2024-01-07 21:25:00|  6| 38.0|
|2023-12-31 18:00:00| 28| 10.0|
|2023-12-31 18:00:00| 25| 27.0|
|2023-12-24 18:00:00| 20| 30.0|
|2023-12-24 18:00:00| 28| 28.0|
|2023-12-17 21:05:00| 28| 20.0|
|2023-12-17 21:05:00| 14| 28.0|
|2023-12-03 18:00:00| 28| 15.0|
|2023-12-03 18:00:00| 15| 45.0|
|2023-11-23 21:30:00|  6| 45.0|
|2023-11-23 21:30:00| 28| 10.0|
|2023-11-19 18:00:00| 19| 31.0|
|2023-11-19 18:00:00| 28| 19.0|
|2023-11-12 21:25:00| 26| 29.0|
|2023-11-12 21:25:00| 28| 26.0|
|2023-11-05 18:00:00| 17| 17.0|
|2023-11-05 18:00:00| 28| 20.0|
|2023-10-29 17:00:00| 21| 38.0|
|2023-10-29 17:00:00| 28| 31.0|
+-------------------+---+-----+
only showing top 20 rows



                                                                                