In [1]:
# Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .enableHiveSupport()
    .appName("Pipeline demo")
    .getOrCreate()
)

24/07/22 15:05:31 WARN Utils: Your hostname, leo-HP-Pavilion-Power-Laptop-15-cb0xx resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlo1)
24/07/22 15:05:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/22 15:05:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Create tables

# raw_club_elo
spark.sql("drop table if exists bronze.raw_club_elo")
spark.sql(
    """
    create table if not exists bronze.raw_club_elo(
    Rank long,
    Club string,
    Country string,
    Level int,
    Elo double,
    From date,
    To date,
    file_modification_time timestamp
    )
    """
)

# raw_match_info
spark.sql("drop table if exists bronze.raw_match_info")
spark.sql(
    """
    create table if not exists bronze.raw_match_info(
    match_id int,
    home_team string,
    away_team string,
    venue string,
    date date,
    file_modification_time timestamp
    )
    """
)

# raw_match_summary
spark.sql("drop table if exists bronze.raw_match_summary")
spark.sql(
    """
    create table if not exists bronze.raw_match_summary(
    match_id int,
    team_id string,
    goals int,
    shots int,
    possession int,
    file_modification_time timestamp
    )
    """
)

# calculated summary
spark.sql("drop table if exists silver.calculated_summary")
spark.sql(
    """
    create table if not exists silver.calculated_summary(
    venue string,
    date date,
    home_team string,
    home_elo double,
    home_goals int,
    home_shots int,
    home_possession int,
    away_team string,
    away_elo double,
    away_goals int,
    away_shots int,
    away_possession int
    )
    """
)

24/07/22 15:11:39 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.
24/07/22 15:11:39 WARN HiveMetaStore: Location: file:/home/leo/PycharmProjects/rb-case-study/spark-warehouse/bronze.db/raw_club_elo specified for non-external table:raw_club_elo
24/07/22 15:11:40 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.
24/07/22 15:11:40 WARN HiveMetaStore: Location: file:/home/leo/PycharmProjects/rb-case-study/spark-warehouse/bronze.db/raw_match_info specified for non-external table:raw_match_info
24/07/22 15:11:40 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.cre

DataFrame[]

In [9]:
import os
import pathlib

# current state of ingest club_elo folder
root = os.getcwd()
ingest_folder = f"{root}/data/raw/club_elo_api"
print(os.listdir(ingest_folder))

['2024-07-21_15-48-39.csv', '2024-07-21_21-25-42.csv', '2024-07-21_21-25-23.csv', '2024-07-21_16-04-35.csv', '2024-07-21_15-51-05.csv']


In [6]:
# every table is empty
print(
    spark.table("bronze.raw_club_elo").count(),
    spark.table("bronze.raw_match_info").count(),
    spark.table("bronze.raw_match_summary").count(),
    spark.table("silver.calculated_summary").count()
)

0 0 0 0


In [11]:
from datetime import date, datetime
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, DateType, TimestampType

In [12]:
# mock data raw_match_info
schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("home_team", StringType(), True),
    StructField("away_team", StringType(), True),
    StructField("venue", StringType(), True),
    StructField("date", DateType(), True),
    StructField("file_modification_time", TimestampType(), True)
])
data = [
    (1, "Lille", "Lens", "Pierre Mauroy", date(2024, 3, 29), datetime.now()),
    (2, "Lens", "Lille", "Bollaert", date(2023, 8, 10), datetime.now())
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").insertInto("bronze.raw_match_info", overwrite=True)

                                                                                

In [14]:
spark.table("bronze.raw_match_info").show()

+--------+---------+---------+-------------+----------+----------------------+
|match_id|home_team|away_team|        venue|      date|file_modification_time|
+--------+---------+---------+-------------+----------+----------------------+
|       2|     Lens|    Lille|     Bollaert|2023-08-10|  2024-07-22 15:20:...|
|       1|    Lille|     Lens|Pierre Mauroy|2024-03-29|  2024-07-22 15:20:...|
+--------+---------+---------+-------------+----------+----------------------+



In [13]:
# mock data raw_match_summary
schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("team_id", StringType(), True),
    StructField("goals", IntegerType(), True),
    StructField("shots", IntegerType(), True),
    StructField("possession", IntegerType(), True),
    StructField("file_modification_time", TimestampType(), True)
])
data = [
    (1, "Lille", 2, 15, 49, datetime.now()),
    (2, "Lille", 1, 8, 56, datetime.now()),
    (1, "Lens", 1, 9, 51, datetime.now()),
    (2, "Lens", 1, 14, 4, datetime.now()),
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").insertInto("bronze.raw_match_summary", overwrite=True)

In [15]:
spark.table("bronze.raw_match_summary").show()

+--------+-------+-----+-----+----------+----------------------+
|match_id|team_id|goals|shots|possession|file_modification_time|
+--------+-------+-----+-----+----------+----------------------+
|       1|  Lille|    2|   15|        49|  2024-07-22 15:21:...|
|       2|   Lens|    1|   14|         4|  2024-07-22 15:21:...|
|       1|   Lens|    1|    9|        51|  2024-07-22 15:21:...|
|       2|  Lille|    1|    8|        56|  2024-07-22 15:21:...|
+--------+-------+-----+-----+----------+----------------------+



In [17]:
# step 1: Ingest
from src.workloads.club_elo_api import etl as club_elo_api_etl

club_elo_api_etl("Lille", ingest_folder)
club_elo_api_etl("Lens", ingest_folder)

In [18]:
print(os.listdir(ingest_folder))

['2024-07-21_15-48-39.csv', '2024-07-21_21-25-42.csv', '2024-07-22_15-23-12.csv', '2024-07-22_15-22-57.csv', '2024-07-21_21-25-23.csv', '2024-07-21_16-04-35.csv', '2024-07-21_15-51-05.csv']


In [19]:
# step 2: Feed raw_club_elo
from src.workloads.club_elo import etl as club_elo_etl

club_elo_etl(spark, ingest_folder)

                                                                                

In [21]:
spark.table("bronze.raw_club_elo").show()

+----+----+-------+-----+-------------+----------+----------+----------------------+
|Rank|Club|Country|Level|          Elo|      From|        To|file_modification_time|
+----+----+-------+-----+-------------+----------+----------+----------------------+
|null|Lens|    FRA|    1|1341.29150391|1945-07-01|1945-08-26|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1|1351.79162598|1945-08-27|1945-09-02|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1|1362.09765625|1945-09-03|1945-09-09|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1|1358.89404297|1945-09-10|1945-09-16|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1| 1352.7779541|1945-09-17|1945-09-23|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1| 1361.4765625|1945-09-24|1945-09-30|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1|1371.03283691|1945-10-01|1945-10-07|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1|1380.92358398|1945-10-08|1945-10-14|  2024-07-22 15:23:...|
|null|Lens|    FRA|    1|1372.29541016|1945-10-15|1945-10-21|  20

In [22]:
# step 3: feed calculated_summary
from src.workloads.calculated_summary import etl as calculated_summary_etl

calculated_summary_etl(spark)

In [23]:
spark.table("silver.calculated_summary").show()

+-------------+----------+---------+-------------+----------+----------+---------------+---------+-------------+----------+----------+---------------+
|        venue|      date|home_team|     home_elo|home_goals|home_shots|home_possession|away_team|     away_elo|away_goals|away_shots|away_possession|
+-------------+----------+---------+-------------+----------+----------+---------------+---------+-------------+----------+----------+---------------+
|     Bollaert|2023-08-10|     Lens|1759.54455566|         1|        14|              4|    Lille|1698.28527832|         1|         8|             56|
|Pierre Mauroy|2024-03-29|    Lille|1739.51953125|         2|        15|             49|     Lens|1747.44165039|         1|         9|             51|
+-------------+----------+---------+-------------+----------+----------+---------------+---------+-------------+----------+----------+---------------+

