# Attention

Download your notebook changes (File > Download) before you are leaving or when you will be inactive for a while, Binder instances close after 30min of inactivity!

# PySpark data exploration

In [2]:
import os

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
spark = (
    SparkSession.builder
                        # Name to identify your experiment in the cluster's dashboard:
                        .appName("big data course")
                        # Connect with the cluster's orchestrator:
                        .master("local[*]") # Cannot be "local" if you want to use your company's cluster.
                        # Maximum memory any result dataframe will take up in driver memory:
                        .config("spark.driver.maxResultSize", "4g")
                        # How much memory can be allocated to the driver (master/orchestrator):
                        .config("spark.driver.memory", "1g")
                        # How much executors will be needed for the experiment:
                        .config("spark.executor.instances", "4")
                        # Alternatively, allow spinning up more executors when there is more computation load, and discard them when less load:
                        # .config("spark.dynamicAllocation.enabled", True)
                        # .config("spark.dynamicAllocation.minExecutors", 1)
                        # .config("spark.dynamic Allocation.maxExecutors", 4)
                        # How much memory can be allocated to each executor:
                        .config("spark.executor.memory", "1g")
                        # How much CPU cores can be used to optimize parallellization within the executors (can be useful for shuffling data etc):
                        #.config("spark.executor.cores", 4)
                        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                        .getOrCreate()
)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/27 08:51:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Clarification on the "Spark UI" link that doesn't work here.

In [3]:
binder_url = 'https://hub.mybinder.org' + os.environ['JUPYTERHUB_SERVICE_PREFIX']
binder_url = "https://hub.gke2.mybinder.org" + os.environ["JUPYTERHUB_SERVICE_PREFIX"]
binder_url

'https://hub.gke2.mybinder.org/user/pythonpredictio--pyspark-binder-bynl6fxc/'

In [4]:
external_spark_ui_url = binder_url + "proxy/4040"
external_spark_ui_url

'https://hub.gke2.mybinder.org/user/pythonpredictio--pyspark-binder-bynl6fxc/proxy/4040'

Normally, you should be able to visit the Spark UI when entering the above URL in a new browser tab. If a token is requested, you can copy-paste it from the command output of the next cell. But you'll probably get a HTTP error 4xx or 5xx.

This *should* work, since the docker container of this binder project uses jupyter-server-proxy to make locally-listened ports available externally on the binder URL. See the documentation: https://jupyter-server-proxy.readthedocs.io/en/latest/arbitrary-ports-hosts.html

It of course *doesn't* work though.
There seems to be an issue with the proxying of the pyspark UI port 4040. We cannot navigate to it externally, but we can internally from within our container (see next cells). This is due to how the binder container is configured.

So, let's settle with the fact that with this binder container, we will not be able to check the Spark UI when experimenting. Check that on your company's cluster, when moving from this toy environment towards "the real thing".
Let's just use the binder container for now to play around with pySpark.

In [5]:
! jupyter server list

Currently running servers:
http://jupyter-pythonpredictio-2d-2dpyspark-2dbinder-2dbynl6fxc:8888/user/pythonpredictio--pyspark-binder-bynl6fxc/?token=1SS4JF0XSPKnw3L-cMBe2Q :: /home/jovyan


In [6]:
# url used internally within this binder's docker container:
spark.sparkContext.uiWebUrl

'http://jupyter-pythonpredictio-2d-2dpyspark-2dbinder-2dbynl6fxc:4040'

In [7]:
# This isn't great, but with this hack we can see that the Spark UI *is* live:
import requests
from IPython.display import HTML

def render_local_spark_ui(subpage="jobs"):
    subpage_url = spark.sparkContext.uiWebUrl + ("" if subpage is None else "/" + subpage)
    print(subpage_url)
    response = requests.get(subpage_url)
    return HTML(data=bytes.decode(response.content))
    
render_local_spark_ui(subpage="jobs")

http://jupyter-pythonpredictio-2d-2dpyspark-2dbinder-2dbynl6fxc:4040/jobs


## Exploratory data analysis

On a small sample of the popular flights dataset: https://github.com/ozlerhakan/datacamp/blob/master/Introduction%20to%20PySpark/flights_small.csv.

In [4]:
flights = spark.read.csv("flights_small.csv", header=True, inferSchema=True) # if inferSchema=True is not set, all columns are just string.
print(type(flights))
flights.printSchema()
flights.show(10)

                                                                                

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+----

In [10]:
# if lots of columns, this plots nicer than show():
flights.limit(10).toPandas()

ImportError: Pandas >= 1.0.5 must be installed; however, it was not found.

In [14]:
# Showing the latest available year:
flights.select(F.max(F.col("year"))).show()

+---------+
|max(year)|
+---------+
|     2014|
+---------+



In [17]:
# Selecting all results to a python variable can be done like this:
max_year = flights.select(F.max(F.col("year"))).collect()[0][0]
max_year

2014

In [18]:
# The full year 2014 is available (though probably a sample of ALL flights happening in the US, since our CSV file is small, there is a larger one that is used often):
flights.groupby("month").count().orderBy("month").show()

[Stage 19:>                                                         (0 + 1) / 1]

+-----+-----+
|month|count|
+-----+-----+
|    1|  748|
|    2|  639|
|    3|  790|
|    4|  817|
|    5|  844|
|    6|  889|
|    7|  976|
|    8| 1065|
|    9|  836|
|   10|  814|
|   11|  780|
|   12|  802|
+-----+-----+



                                                                                

In [20]:
flights = (
    flights.withColumn("source", F.lit("https://github.com/ozlerhakan/datacamp/blob/master/Introduction%20to%20PySpark/flights_small.csv"))
           .withColumnRenamed("tailnum", "tail_number")
)
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-----------+------+------+----+--------+--------+----+------+--------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tail_number|flight|origin|dest|air_time|distance|hour|minute|              source|
+----+-----+---+--------+---------+--------+---------+-------+-----------+------+------+----+--------+--------+----+------+--------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX|     N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|https://github.co...|
|2014|    1| 22|    1040|        5|    1505|        5|     AS|     N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|https://github.co...|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX|     N847VA|   755|   SEA| SFO|     111|     679|  14|    43|https://github.co...|
|2014|    4|  9|    1705|       45|    1839|       34|     WN|     N360SW|   344|   PDX| SJC|      83|     569|  17|     5|https:/

In [9]:
%%time
# Which flights flew to Las Vegas?
flights.filter("dest = 'LAS'").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   10| 30|     811|       21|    1038|       29|     AS| N433AS|   608|   SEA| LAS|     127|     867|   8|    11|
|2014|    2| 25|     555|       -5|     819|       -3|     AS| N549AS|   604|   SEA| LAS|     121|     867|   5|    55|
|2014|   11| 15|    1858|        8|    2107|        2|     AS| N587AS|   616|   SEA| LAS|     106|     867|  18|    58|
|2014|    9| 28|    2041|       -9|    2251|       -7|     AS| N552AS|   624|   PDX| LAS|     106|     763|  20|    41|
|2014|    7| 27|    1020|       -5|    1251|        2|     AS| N587AS|   586|   SEA| LAS|     130|     867|  10|    20|
|2014|    3| 18|    1832|       -3|    2

In [24]:
# On your Spark UI, you will see the operations as they are being executed. Here we see that the above operations have been successfully executed:
render_local_spark_ui(subpage="jobs")

http://jupyter-pythonpredictio-2d-2dpyspark-2dbinder-2dbynl6fxc:4040/jobs


Job Id ▾,Description,Submitted,Duration,Stages: Succeeded/Total,Tasks (for all stages): Succeeded/Total
20,showString at NativeMethodAccessorImpl.java:0 showString at NativeMethodAccessorImpl.java:0,2023/04/27 08:31:49,85 ms,1/1,1/1
19,showString at NativeMethodAccessorImpl.java:0 showString at NativeMethodAccessorImpl.java:0,2023/04/27 08:31:10,0.2 s,1/1,1/1
18,showString at NativeMethodAccessorImpl.java:0 showString at NativeMethodAccessorImpl.java:0,2023/04/27 08:30:51,0.2 s,1/1,1/1
17,showString at NativeMethodAccessorImpl.java:0 showString at NativeMethodAccessorImpl.java:0,2023/04/27 08:30:28,0.2 s,1/1,1/1
16,showString at NativeMethodAccessorImpl.java:0 showString at NativeMethodAccessorImpl.java:0,2023/04/27 08:28:26,0.2 s,1/1  (1 skipped),1/1  (1 skipped)
15,showString at NativeMethodAccessorImpl.java:0 showString at NativeMethodAccessorImpl.java:0,2023/04/27 08:28:25,0.8 s,1/1,1/1
14,collect at /tmp/ipykernel_67/2491323169.py:1 collect at /tmp/ipykernel_67/2491323169.py:1,2023/04/27 08:25:41,28 ms,1/1  (1 skipped),1/1  (1 skipped)
13,collect at /tmp/ipykernel_67/2491323169.py:1 collect at /tmp/ipykernel_67/2491323169.py:1,2023/04/27 08:25:41,0.2 s,1/1,1/1
12,collect at /tmp/ipykernel_67/2462410233.py:1 collect at /tmp/ipykernel_67/2462410233.py:1,2023/04/27 08:25:35,35 ms,1/1  (1 skipped),1/1  (1 skipped)
11,collect at /tmp/ipykernel_67/2462410233.py:1 collect at /tmp/ipykernel_67/2462410233.py:1,2023/04/27 08:25:35,0.2 s,1/1,1/1


In [27]:
# Visualizing the DAG for an operation: (read it bottom-up.)
flight_stats = flights.groupby("month").count().orderBy("month")
flight_stats.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [month#149 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(month#149 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=332]
      +- HashAggregate(keys=[month#149], functions=[count(1)])
         +- Exchange hashpartitioning(month#149, 200), ENSURE_REQUIREMENTS, [plan_id=329]
            +- HashAggregate(keys=[month#149], functions=[partial_count(1)])
               +- FileScan csv [month#149] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/flights_small.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<month:int>




In [None]:
# Feel free to explore a bit more!

## SparkML

In [5]:
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [6]:
flights.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute']

In [8]:
from pyspark.ml.classification import DecisionTreeClassifier

clf = DecisionTreeClassifier(inputCols=[col for col in flights.columns if col != "arr_delay"], outputCol="arr_delay")
df_train, df_test = flights.randomSplit([0.8, 0.2], seed=42)
clf.fit(df_train)
df_test_with_predictions = clf.transform(df_test)
df_test_with_predictions.show()

ModuleNotFoundError: No module named 'numpy'

In [None]:
# Feel free to experiment some more! You can use the mindmap shared with this course for inspiration.

## Efficient querying

In [11]:
?flights.write.parquet

[0;31mSignature:[0m
[0mflights[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mparquet[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mpath[0m[0;34m:[0m [0mstr[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmode[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mstr[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpartitionBy[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mList[0m[0;34m[[0m[0mstr[0m[0;34m][0m[0;34m,[0m [0mNoneType[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcompression[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mstr[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Saves the content of the :class:`DataFrame` in Parquet format at the specified path.

.. versionadded:: 1.4.0

.. versionchanged:: 3.4.0
    Supports Spark Connect.

Parameters
-

In [12]:
flights.write.parquet("flights_results.parquet", partitionBy=["year", "month", "day"]) # you could have specified even extra columns: ["origin", "year", "month", "day", "hour"]... Note the order! 

                                                                                

In [None]:
# check the folder structure that appeared, in the folder explorer! (on the left in this window.)

In [13]:
# You can load a small partition to play around!
flights_31dec = spark.read.parquet("flights_results.parquet/year=2014/month=12/day=31")
flights_31dec.show()

+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|    1719|       -6|    2130|      -15|     AS| N590AS|   875|   SEA| KOA|     342|    2688|  17|    19|
|    1326|       -4|    1600|       -9|     AS| N706AS|   494|   SEA| SAN|     129|    1050|  13|    26|
|     950|      -10|    1358|       -7|     HA| N373HA|    29|   SEA| OGG|     333|    2640|   9|    50|
|     806|       -4|    1352|       -3|     AA| N4XBAA|  1608|   PDX| DFW|     202|    1616|   8|     6|
|    1729|       24|    1928|       23|     WN| N602SW|   894|   SEA| OAK|      97|     671|  17|    29|
|    1205|       30|    1434|       29|     B6| N662JB|   107|   SEA| LGB|     134|     965|  12|     5|
|    2120|       -5|       6|       -9|     AS| N768AS|

In [None]:
# Feel free to play around a bit more! We did not demonstrate all functionality that we presented in the course!

In [14]:
spark.stop()