In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import os
import sys

current_dir = os.getcwd()
root_dir = os.path.abspath(os.path.join(current_dir, os.pardir))
sys.path.insert(0, root_dir)

from scripts.common_functions import *

In [2]:
# data_folder = os.path.join(root_dir, 'data')
# data = []

# for file_name in os.listdir(data_folder):
#     if file_name.endswith('.json'):
#         file_path = os.path.join(data_folder,file_name)
#         print(file_path)
        
#         with open(file_path, 'r') as f:
#             json_payload = json.load(f)

#         data.append({"file_path":file_path,"json_payload":json_payload})

# df = pd.DataFrame(data)

# df.head()

# # Level 1 normalization
# df_normalized = pd.json_normalize(df['json_payload'], sep="_")
# df_normalized.head()

# # Creating a match_id before explosion
# # df_normalized['match_id'] = df_normalized['innings'].apply(lambda x : hashlib.sha256(str(x).encode()).hexdigest())
# # First explosion due to 2 innings per match
# exploded_df = df_normalized.explode('innings')
# exploded_df.head()

# # Collecting Key columns
# df_meta = exploded_df[['info_dates', 'info_event_name', 'info_gender', 'info_match_type']]
# # Level 2 normalization
# df_final = pd.json_normalize(exploded_df['innings'], sep="_")
# final_df = pd.concat([df_meta.reset_index(drop=True), df_final.reset_index(drop=True)], axis=1)
# final_df.head()


# # Writing to final csv file
# table_name = 't20_raw.csv'
# folder_path = os.path.join(root_dir,'tables')

# if not os.path.exists(folder_path):
#     os.makedirs(folder_path)
#     print(f"Folder '{folder_path}' created.")
# else:
#     print(f"Folder '{folder_path}' already exists.")

# final_df.to_csv(os.path.join(folder_path,table_name), index=False)

In [3]:
spark = create_spark_session("raw_to_cleansed_matches")

table_name = 't20_matches_cleansed.csv'
folder_path = os.path.join(root_dir,'tables')

spark = create_spark_session("raw_to_cleansed_innings")

df = spark.read.csv(os.path.join(folder_path, table_name), header=True)
df.show()

+--------------------+--------------------+----------+--------------------+------+----------+-----------+------------+-----------+-----------+-----------------+----------------+--------------------+--------------------+
|            match_id|          innings_id|match_date|          event_name|gender|match_type|       team|target_overs|target_runs|absent_hurt|penalty_runs_post|penalty_runs_pre|               overs|          powerplays|
+--------------------+--------------------+----------+--------------------+------+----------+-----------+------------+-----------+-----------+-----------------+----------------+--------------------+--------------------+
|f45476aade5497567...|8c032156156ba7fda...|2017-02-17|Sri Lanka in Aust...|  male|       T20|  Australia|        NULL|       NULL|       NULL|             NULL|            NULL|[{'over': 0, 'del...|[{'from': 0.1, 't...|
|f45476aade5497567...|ccf7f3f9c2c871888...|2017-02-17|Sri Lanka in Aust...|  male|       T20|  Sri Lanka|        20.0|  

In [4]:
df_overs = df.select('innings_id', 'overs')

In [5]:
overs_schema = 'ARRAY<STRUCT<over: BIGINT,deliveries: ARRAY<STRUCT<batter: STRING,bowler: STRING,non_striker: STRING,runs: STRUCT<batter: BIGINT,extras: BIGINT,total: BIGINT>,extras: STRUCT<wides: BIGINT,byes: BIGINT,legbyes: BIGINT,noballs: BIGINT,penalty: BIGINT>>>>>'

In [6]:
df_overs = df_overs.withColumn('overs_parsed', from_json(col('overs'), overs_schema))
df_overs = df_overs.withColumn('overs_exploded', explode(col('overs_parsed')))
df_overs = df_overs.select('innings_id', 'overs_exploded.over', 'overs_exploded.deliveries')
df_overs = df_overs.withColumn('deliveries_exploded', explode('deliveries'))
df_overs = df_overs.select(col('innings_id'), col('over'), col('deliveries_exploded.batter'), col('deliveries_exploded.bowler'), col('deliveries_exploded.non_striker'), col('deliveries_exploded.runs.batter').alias('runs_off_bat'), col('deliveries_exploded.runs.extras').alias('runs_extra'), col('deliveries_exploded.runs.total').alias('total_runs'), col('deliveries_exploded.extras.wides').alias('wides'), col('deliveries_exploded.extras.byes').alias('byes'), col('deliveries_exploded.extras.legbyes').alias('legbyes'), col('deliveries_exploded.extras.noballs').alias('noballs'), col('deliveries_exploded.extras.penalty').alias('penalty'))
df_overs.show(truncate=True)

+--------------------+----+---------+---------------+-----------+------------+----------+----------+-----+----+-------+-------+-------+
|          innings_id|over|   batter|         bowler|non_striker|runs_off_bat|runs_extra|total_runs|wides|byes|legbyes|noballs|penalty|
+--------------------+----+---------+---------------+-----------+------------+----------+----------+-----+----+-------+-------+-------+
|8c032156156ba7fda...|   0| AJ Finch|     SL Malinga|  M Klinger|           0|         0|         0| NULL|NULL|   NULL|   NULL|   NULL|
|8c032156156ba7fda...|   0| AJ Finch|     SL Malinga|  M Klinger|           0|         0|         0| NULL|NULL|   NULL|   NULL|   NULL|
|8c032156156ba7fda...|   0| AJ Finch|     SL Malinga|  M Klinger|           1|         0|         1| NULL|NULL|   NULL|   NULL|   NULL|
|8c032156156ba7fda...|   0|M Klinger|     SL Malinga|   AJ Finch|           2|         0|         2| NULL|NULL|   NULL|   NULL|   NULL|
|8c032156156ba7fda...|   0|M Klinger|     SL Mal

In [7]:
df_meta = df.select('match_id','innings_id','match_date','event_name','gender','match_type','team')

df_final = df_meta.join(df_overs, how='inner', on='innings_id')

df_final.show()

+--------------------+--------------------+----------+--------------------+------+----------+---------+----+---------+---------------+-----------+------------+----------+----------+-----+----+-------+-------+-------+
|          innings_id|            match_id|match_date|          event_name|gender|match_type|     team|over|   batter|         bowler|non_striker|runs_off_bat|runs_extra|total_runs|wides|byes|legbyes|noballs|penalty|
+--------------------+--------------------+----------+--------------------+------+----------+---------+----+---------+---------------+-----------+------------+----------+----------+-----+----+-------+-------+-------+
|9a5bc3f65795a11d8...|d25211a68f230b803...|2017-02-19|Sri Lanka in Aust...|  male|       T20|Australia|   0|M Klinger|     SL Malinga|   AJ Finch|           0|         0|         0| NULL|NULL|   NULL|   NULL|   NULL|
|9a5bc3f65795a11d8...|d25211a68f230b803...|2017-02-19|Sri Lanka in Aust...|  male|       T20|Australia|   0|M Klinger|     SL Maling