# Laboratory 1

In [50]:
from pyspark.sql import SparkSession

## Spark Session Initialization

Here is where the SparkSession object is created and initializated. This has the same effect as going to the console and type *pyspark*. Just that here is a Python object that is created that we can start to utilice for our programs. You can see that the response of the object has the version of the instaled *spark*, where is the master located, the app that is currently running and a link to the UI for more information of the system.

In [51]:
spark = SparkSession\
    .builder\
    .appName('Lab01')\
    .getOrCreate()
spark

## Getting the data

After we get the *spark* object, we can start to use all of the commands, objects and functions of the system. In this case we need to get the data from the folder flight-data, and create a DataFrame with that data. For that we use the function of spark **read**, that allows us to read some type of data (csv, txt, parquet, etc.), next we can give some options to the **read** function, in this specific problem, we give 2 configuration. 

* The first one *inferSchema*, is an instruction to the **read** function that it need to infer the data type of each column from the same data that is reading.
* The second one *header*, is an instruction indicating that the data, in the first row, has the name for each of the columns.

Then the last intruction to the **read** function is to indicate what type of file we are reading and in what route is located that file, for this case that we need to read a *csv* we use that function and indicate the path of the file.

In [52]:
flightData2015 = spark\
    .read\
    .option('inferSchema', 'True')\
    .option('header', 'True')\
    .csv(r'../../../data/flight-data/csv/2015-summary.csv')

## Lazy Evaluation and Functions

With the Dataframe created we now have the first intruction of the plan for the DataFrame. If you remember what you have read, *Spark* is lazy and doesnt excecutes any of the transformation or processes until is necessary. The excecution of the pipeline created by *Spark* is when: 

* Is necessary to view data in the console.
* Is necessary to collect data to native objects in the respective language.
* Is necessary to write to output data sources.

So, if we get the *Physical Plan* from the Dataframe object, we will get just the FileScan (This is just the name of the read function in the *Physical Plan*) for the flight-data file.

In [66]:
flightData2015.explain()

== Physical Plan ==
FileScan csv [DEST_COUNTRY_NAME#227,ORIGIN_COUNTRY_NAME#228,count#229] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/jorge/OneDrive/Mis Documentos/Projects/Spark-The-Defini..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




Next, we can use the function **take** to view the data in the console. This will be an action and activates the pipeline created by *Spark* for the Dataframe and it will show us the response in the console

In [67]:
flightData2015.take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

Another function that doesn't get us a response is the function **sort**, if we see the response in the console if not data, but an object and if we see the *Physical Plan* of the DataFrame we'll see that anothe 2 steps were added to the Plan.

In [68]:
flightData2015.sort("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [69]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#229 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#229 ASC NULLS FIRST, 15000), ENSURE_REQUIREMENTS, [plan_id=619]
      +- FileScan csv [DEST_COUNTRY_NAME#227,ORIGIN_COUNTRY_NAME#228,count#229] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/jorge/OneDrive/Mis Documentos/Projects/Spark-The-Defini..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




When we use the function **sort** that is a wide transformation, by default the function will create 200 partitions of the data, we can change that configuration by changing the value in the configuration variable *spark.sql.shuffle.partitions* to the number of partitions that we need.

In [70]:
spark.conf.set("spark.sql.shuffle.partitions", "15000")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

## DataFrames and SQL

Another advantage of *Spark* is that we dont need to only excecute programs in Python, we can also use the SQL language to create *Physical Plans* for our transformations. For this is necesary to create the table in the Data Base motor, or in this case we'll create a *Temporary View* that allows us to access the data with SQL while the Spark Session is turn on, in the moment that we stop the Spark Session, the data in the *Temporary View* will desappear. 

In [71]:
flightData2015.createOrReplaceTempView('flight_data_2015')

Next we'll get the number of times a country is reapeated in the Data, doing it with *SQL* and *Python* to see if the *Physical Plans* of the two tranformations paths are equal.

In [72]:
sqlWay = spark.sql("""
    SELECT
        DEST_COUNTRY_NAME,
        COUNT(1)
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
    .groupBy('DEST_COUNTRY_NAME')\
    .count()

And if we see the response of the *explain* to each of the DataFrames we'll see that the *Physical Plans* of each one of them is exactly the same.

In [73]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#227], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#227, 15000), ENSURE_REQUIREMENTS, [plan_id=641]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#227], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#227] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/jorge/OneDrive/Mis Documentos/Projects/Spark-The-Defini..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#227], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#227, 15000), ENSURE_REQUIREMENTS, [plan_id=654]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#227], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#227] Batched: false, DataFilters: [], Format: CSV

Now that we know how does *Spark* work internally, we can start getting some interesting information from the data that we have. In this case we need to know what is the maximum number of flights to and from any location. For this we'll need to use the function **max**.

In [74]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [75]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

But this only gives us the maximum number of all the data, if we need to get the maximum of each one of the countries, we'll need the variable *DEST_COUNTRY_NAME* to used it as a varible for agruppation. When we use the function **groupBy** in SQL or Python, what we are doing is to apply the specifyc function to each a every value that the variable to group by has. In this case we'll group by the variable *DEST_COUNTRY_NAME* to the the **sum** of the number of flights there.

In [76]:
maxSql = spark.sql("""
    SELECT 
        DEST_COUNTRY_NAME, 
        sum(count) as destination_total
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY sum(count) DESC
    LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [77]:
from pyspark.sql.functions import desc
flightData2015.groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



And if the see the *Physical Plan* we'll see that:

1. It scans the data from the CSV.
2. Then agregates the data in a partial sum of the data in the actual partitions.
3. Then reorganice the data in the partitions for each agrupation
4. Then get the final sum for each partition
5. And finally it sorts the information from higher to lower.

In [78]:
flightData2015.groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#417L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#227,destination_total#417L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#227], functions=[sum(count#229)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#227, 15000), ENSURE_REQUIREMENTS, [plan_id=824]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#227], functions=[partial_sum(count#229)])
            +- FileScan csv [DEST_COUNTRY_NAME#227,count#229] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/jorge/OneDrive/Mis Documentos/Projects/Spark-The-Defini..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


