In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("IPL Data Ingestion").getOrCreate()
spark


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("IPL Data Pipeline").getOrCreate()


In [0]:
from pyspark.sql.types import StringType
string_schema = StringType()


In [0]:
from pyspark.sql.types import StructType, StructField, StringType
string_schema = StructType([
    StructField("id", StringType(), True),
    StructField("season", StringType(), True),
    StructField("city", StringType(), True),
    StructField("date", StringType(), True),
    StructField("match_type", StringType(), True),
    StructField("player_of_match", StringType(), True),
    StructField("venue", StringType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("toss_decision", StringType(), True),
    StructField("winner", StringType(), True),
    StructField("result", StringType(), True),
    StructField("result_margin", StringType(), True),
    StructField("target_runs", StringType(), True),
    StructField("target_overs", StringType(), True),
    StructField("super_over", StringType(), True),
    StructField("method", StringType(), True),
    StructField("umpire1", StringType(), True),
    StructField("umpire2", StringType(), True)
])
ipl_raw_data_string = spark.read.option("header", "true").schema(string_schema).csv("/FileStore/tables/matches.csv")
ipl_raw_data_string.show(5)


+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|335982|2007/08| Bangalore|2008-04-18|    League|    BB McCullum|M Chinnaswamy Sta...|Royal Challengers...|Kolkata Knight Ri...|Royal Challengers...|        field|Kolkata Knig

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
struct_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("season", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("date", DateType(), True),
    StructField("match_type", StringType(), True),
    StructField("player_of_match", StringType(), True),
    StructField("venue", StringType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("toss_decision", StringType(), True),
    StructField("winner", StringType(), True),
    StructField("result", StringType(), True),
    StructField("result_margin", IntegerType(), True),
    StructField("target_runs", IntegerType(), True),
    StructField("target_overs", IntegerType(), True),
    StructField("super_over", StringType(), True),
    StructField("method", StringType(), True),
    StructField("umpire1", StringType(), True),
    StructField("umpire2", StringType(), True)
])

ipl_raw_data_struct = spark.read.option("header", "true").schema(struct_schema).csv("/FileStore/tables/matches.csv")
ipl_raw_data_struct.show(5)


+------+------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id|season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|335982|  null| Bangalore|2008-04-18|    League|    BB McCullum|M Chinnaswamy Sta...|Royal Challengers...|Kolkata Knight Ri...|Royal Challengers...|        field|Kolkata Knight R

In [0]:

raw_data_check = spark.read.option("header", "true").csv("/FileStore/tables/matches.csv")
raw_data_check.show(5)


+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|335982|2007/08| Bangalore|2008-04-18|    League|    BB McCullum|M Chinnaswamy Sta...|Royal Challengers...|Kolkata Knight Ri...|Royal Challengers...|        field|Kolkata Knig

In [0]:

ipl_raw_data_struct_clean = ipl_raw_data_struct.filter(ipl_raw_data_struct["season"].isNotNull())
ipl_raw_data_struct_clean.show(5)


+------+------+--------------+----------+----------+---------------+----------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+-----------+-----------+
|    id|season|          city|      date|match_type|player_of_match|           venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|    umpire1|    umpire2|
+------+------+--------------+----------+----------+---------------+----------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+-----------+-----------+
|392181|  2009|     Cape Town|2009-04-18|    League|   SR Tendulkar|        Newlands| Chennai Super Kings|      Mumbai Indians| Chennai Super Kings|        field|      Mumbai Indian

In [0]:

ipl_raw_data_struct_fixed = ipl_raw_data_struct.withColumn("season", ipl_raw_data_struct["season"].cast(IntegerType()))
ipl_raw_data_struct_fixed.show(5)


+------+------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id|season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|335982|  null| Bangalore|2008-04-18|    League|    BB McCullum|M Chinnaswamy Sta...|Royal Challengers...|Kolkata Knight Ri...|Royal Challengers...|        field|Kolkata Knight R

In [0]:

string_schema = "id STRING, season STRING, city STRING, date STRING, match_type STRING, player_of_match STRING, venue STRING, team1 STRING, team2 STRING, toss_winner STRING, toss_decision STRING, winner STRING, result STRING, result_margin STRING, target_runs STRING, target_overs STRING, super_over STRING, method STRING, umpire1 STRING, umpire2 STRING"
raw_data_string = spark.read.schema(string_schema).csv('/FileStore/tables/matches.csv')
raw_data_string.show(5, truncate=False)


+------+-------+----------+----------+----------+---------------+------------------------------------------+---------------------------+---------------------------+---------------------------+-------------+---------------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|id    |season |city      |date      |match_type|player_of_match|venue                                     |team1                      |team2                      |toss_winner                |toss_decision|winner                     |result |result_margin|target_runs|target_overs|super_over|method|umpire1  |umpire2       |
+------+-------+----------+----------+----------+---------------+------------------------------------------+---------------------------+---------------------------+---------------------------+-------------+---------------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|id    |season |city     

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
structured_schema = StructType([
    StructField("id", StringType(), True),
    StructField("season", StringType(), True),
    StructField("city", StringType(), True),
    StructField("date", StringType(), True),
    StructField("match_type", StringType(), True),
    StructField("player_of_match", StringType(), True),
    StructField("venue", StringType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("toss_decision", StringType(), True),
    StructField("winner", StringType(), True),
    StructField("result", StringType(), True),
    StructField("result_margin", IntegerType(), True),
    StructField("target_runs", IntegerType(), True),
    StructField("target_overs", IntegerType(), True),
    StructField("super_over", StringType(), True),
    StructField("method", StringType(), True),
    StructField("umpire1", StringType(), True),
    StructField("umpire2", StringType(), True)
])
raw_data_struct = spark.read.schema(structured_schema).csv('/FileStore/tables/matches.csv')
raw_data_struct.show(5)


+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|            

In [0]:

cleaned_data = raw_data_struct.dropna(subset=["id", "season", "city"])
cleaned_data.show(5)


+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|            

In [0]:
from pyspark.sql.functions import col

cleaned_data = cleaned_data.withColumn("target_runs", col("target_runs").cast(IntegerType()))
cleaned_data.show(5)


+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|            

In [0]:
cleaned_data.write.option("header", "true").csv("/FileStore/tables/cleaned_matches.csv")


In [0]:
cleaned_data.write.format("delta").save("/FileStore/tables/cleaned_matches_delta")


In [0]:
delta_df = spark.read.format("delta").load("/FileStore/tables/cleaned_matches_delta")
delta_df.show(5)


+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|target_runs|target_overs|super_over|method|  umpire1|       umpire2|
+------+-------+----------+----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+-----------+------------+----------+------+---------+--------------+
|    id| season|      city|      date|match_type|player_of_match|               venue|               team1|               team2|         toss_winner|toss_decision|            