In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as func

sparkSess = SparkSession.builder.appName('F1Analysis').getOrCreate()

input_df = sparkSess.read.format('csv').option('header',True).load('dbfs:/FileStore/tables/drivers.csv')



In [0]:
driver_df = sparkSess.read.format('csv').option('header',True).load('dbfs:/FileStore/tables/drivers.csv')
driver_df.show(10)
driver_df.columns

+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|driverId| driverRef|number|code| forename|   surname|       dob|nationality|                 url|
+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|       1|  hamilton|    44| HAM|    Lewis|  Hamilton|1985-01-07|    British|http://en.wikiped...|
|       2|  heidfeld|    \N| HEI|     Nick|  Heidfeld|1977-05-10|     German|http://en.wikiped...|
|       3|   rosberg|     6| ROS|     Nico|   Rosberg|1985-06-27|     German|http://en.wikiped...|
|       4|    alonso|    14| ALO| Fernando|    Alonso|1981-07-29|    Spanish|http://en.wikiped...|
|       5|kovalainen|    \N| KOV|   Heikki|Kovalainen|1981-10-19|    Finnish|http://en.wikiped...|
|       6|  nakajima|    \N| NAK|   Kazuki|  Nakajima|1985-01-11|   Japanese|http://en.wikiped...|
|       7|  bourdais|    \N| BOU|Sébastien|  Bourdais|1979-02-28|     French|http://en.wikiped...|
|       8|

In [0]:
races_df = sparkSess.read.format('csv').option('header',True).load('dbfs:/FileStore/tables/races.csv')


In [0]:
races_df.show(20)

+------+----+-----+---------+--------------------+----------+--------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+
|raceId|year|round|circuitId|                name|      date|    time|                 url|fp1_date|fp1_time|fp2_date|fp2_time|fp3_date|fp3_time|quali_date|quali_time|sprint_date|sprint_time|
+------+----+-----+---------+--------------------+----------+--------+--------------------+--------+--------+--------+--------+--------+--------+----------+----------+-----------+-----------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|      \N|      \N|      \N|      \N|      \N|      \N|        \N|        \N|         \N|         \N|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|09:00:00|http://en.wikiped...|      \N|      \N|      \N|      \N|      \N|      \N|        \N|        \N|         \N|         \N|
|     3|2009|    3|       17|  Chinese G

In [0]:
results_df = sparkSess.read.format('csv').option('header',True).load('dbfs:/FileStore/tables/results.csv')

In [0]:
results_df.show(50)
results_df.columns

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|    10|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|        218.300|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|     8|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|       3|           3|            3|  

In [0]:
%run "./reader_class"

In [0]:
%run "./transformer_job"

In [0]:
class workFlow:
    def __init__(self):
        pass

    def runner(self):

        driver_data = load_data_source(
            data_type = "csv",
            file_path = 'dbfs:/FileStore/tables/drivers.csv'
        ).get_data_frame()

        race_data = load_data_source(
            data_type = "csv",
            file_path = 'dbfs:/FileStore/tables/races.csv'
        ).get_data_frame()

        result_data = load_data_source(
            data_type = "csv",
            file_path = 'dbfs:/FileStore/tables/results.csv'
        ).get_data_frame()


        # Top 5 Drivers who had won 5 consecutive races in year 2012. We need to join 3 tables for this: drivers, races and results.
        input_Dfs = {"drivers": driver_data, 
                     "races": race_data,
                     "results":result_data}
        

        return driver_race_result_join().table_join(input_Dfs)
        
workflow = workFlow()

In [0]:
result_df = workflow.runner()

Run Start


In [0]:
result_df.show()

+--------+------+----------+---------+----------+-----------+----+--------------------+----------+----+--------+------------+-------------+------+
|driverId|raceId| driverRef| forename|   surname|nationality|year|                name|      date|grid|position|positionText|positionOrder|points|
+--------+------+----------+---------+----------+-----------+----+--------------------+----------+----+--------+------------+-------------+------+
|       1|    18|  hamilton|    Lewis|  Hamilton|    British|2008|Australian Grand ...|2008-03-16|   1|       1|           1|            1|    10|
|       2|    18|  heidfeld|     Nick|  Heidfeld|     German|2008|Australian Grand ...|2008-03-16|   5|       2|           2|            2|     8|
|       3|    18|   rosberg|     Nico|   Rosberg|     German|2008|Australian Grand ...|2008-03-16|   7|       3|           3|            3|     6|
|       4|    18|    alonso| Fernando|    Alonso|    Spanish|2008|Australian Grand ...|2008-03-16|  11|       4|      

In [0]:
result_df.filter(result_df.year == 2012).show()

+--------+------+------------------+---------+----------+-----------+----+--------------------+----------+----+--------+------------+-------------+------+
|driverId|raceId|         driverRef| forename|   surname|nationality|year|                name|      date|grid|position|positionText|positionOrder|points|
+--------+------+------------------+---------+----------+-----------+----+--------------------+----------+----+--------+------------+-------------+------+
|      18|   860|            button|   Jenson|    Button|    British|2012|Australian Grand ...|2012-03-18|   2|       1|           1|            1|    25|
|      20|   860|            vettel|Sebastian|    Vettel|     German|2012|Australian Grand ...|2012-03-18|   6|       2|           2|            2|    18|
|       1|   860|          hamilton|    Lewis|  Hamilton|    British|2012|Australian Grand ...|2012-03-18|   1|       3|           3|            3|    15|
|      17|   860|            webber|     Mark|    Webber| Australian|2

In [0]:
result_df.filter(result_df.year == 2012).select('raceId').show(50)

+------+
|raceId|
+------+
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   860|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   861|
|   862|
|   862|
+------+
only showing top 50 rows



In [0]:
window = Window.partitionBy('raceId').orderBy('date')
result_df = result_df.filter(result_df.year == 2012).withColumn("per_race_winner", row_number().over(window))
result_df.filter(result_df.per_race_winner == 1).show()

+--------+------+---------+---------+---------+-----------+----+--------------------+----------+----+--------+------------+-------------+------+-------------+---------------+
|driverId|raceId|driverRef| forename|  surname|nationality|year|                name|      date|grid|position|positionText|positionOrder|points|per_race_data|per_race_winner|
+--------+------+---------+---------+---------+-----------+----+--------------------+----------+----+--------+------------+-------------+------+-------------+---------------+
|      18|   860|   button|   Jenson|   Button|    British|2012|Australian Grand ...|2012-03-18|   2|       1|           1|            1|    25|            1|              1|
|       4|   861|   alonso| Fernando|   Alonso|    Spanish|2012|Malaysian Grand Prix|2012-03-25|   8|       1|           1|            1|    25|            1|              1|
|       3|   862|  rosberg|     Nico|  Rosberg|     German|2012|  Chinese Grand Prix|2012-04-15|   1|       1|           1|  

What does Spark do when we run the code?

It builds a logical execution plan and creates Jobs, Stages and Tasks based on the computations.     
1. A job in Spark represents some kind of computation and is triggered when an action is called such as 'count()', 'collect()', 'save()'.      
2. Each job is broken down into smaller, manageable units called Stages are created when there is shuffling of data or wide transformations such as 'groupBy()' or 'reduceByKey()'.
3. Tasks are the smallest unit of work in Spark. It represents the computation performed on a single partition of the dataset. 