In [None]:
from pyspark.sql import SparkSession
import os
import tempfile

import urllib.request

# Create Spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("sample-read") \
    .getOrCreate()

# Download a small sample CSV (Iris) to a temp file and read it with Spark
url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
local_path = os.path.join(tempfile.gettempdir(), "iris.csv")
urllib.request.urlretrieve(url, local_path)

df = spark.read.csv(local_path, header=True, inferSchema=True)

# Quick sanity checks
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows
root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [4]:
df.columns, df.count()

(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species'],
 150)

In [5]:
df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [10]:
df.select('sepal_length', 'species').show(5)

+------------+-------+
|sepal_length|species|
+------------+-------+
|         5.1| setosa|
|         4.9| setosa|
|         4.7| setosa|
|         4.6| setosa|
|         5.0| setosa|
+------------+-------+
only showing top 5 rows


In [12]:
df.describe().show()

+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335|  3.057333333333334|3.7580000000000027| 1.199333333333334|     NULL|
| stddev|0.8280661279778637|0.43586628493669793|1.7652982332594662|0.7622376689603467|     NULL|
|    min|               4.3|                2.0|               1.0|               0.1|   setosa|
|    max|               7.9|                4.4|               6.9|               2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+



In [14]:
df.withColumn("sepal_length_10", df['sepal_length']+10).show(5, truncate=False)

+------------+-----------+------------+-----------+-------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|
+------------+-----------+------------+-----------+-------+---------------+
|5.1         |3.5        |1.4         |0.2        |setosa |15.1           |
|4.9         |3.0        |1.4         |0.2        |setosa |14.9           |
|4.7         |3.2        |1.3         |0.2        |setosa |14.7           |
|4.6         |3.1        |1.5         |0.2        |setosa |14.6           |
|5.0         |3.6        |1.4         |0.2        |setosa |15.0           |
+------------+-----------+------------+-----------+-------+---------------+
only showing top 5 rows


In [20]:
df = df.withColumn("sepal_length_10", df['sepal_length']+10)

In [22]:
df.show(5, truncate=False)

+------------+-----------+------------+-----------+-------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|
+------------+-----------+------------+-----------+-------+---------------+
|5.1         |3.5        |1.4         |0.2        |setosa |15.1           |
|4.9         |3.0        |1.4         |0.2        |setosa |14.9           |
|4.7         |3.2        |1.3         |0.2        |setosa |14.7           |
|4.6         |3.1        |1.5         |0.2        |setosa |14.6           |
|5.0         |3.6        |1.4         |0.2        |setosa |15.0           |
+------------+-----------+------------+-----------+-------+---------------+
only showing top 5 rows


# Filtering examples

In [58]:
df.filter(df['sepal_length'] > 5).show()

+------------+-----------+------------+-----------+-------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|
+------------+-----------+------------+-----------+-------+---------------+
|         5.1|        3.5|         1.4|        0.2| setosa|           15.1|
|         5.4|        3.9|         1.7|        0.4| setosa|           15.4|
|         5.4|        3.7|         1.5|        0.2| setosa|           15.4|
|         5.8|        4.0|         1.2|        0.2| setosa|           15.8|
|         5.7|        4.4|         1.5|        0.4| setosa|           15.7|
|         5.4|        3.9|         1.3|        0.4| setosa|           15.4|
|         5.1|        3.5|         1.4|        0.3| setosa|           15.1|
|         5.7|        3.8|         1.7|        0.3| setosa|           15.7|
|         5.1|        3.8|         1.5|        0.3| setosa|           15.1|
|         5.4|        3.4|         1.7|        0.2| setosa|           15.4|
|         5.

In [59]:
df.filter(df['species'] == 'setosa').show(5)

+------------+-----------+------------+-----------+-------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|
+------------+-----------+------------+-----------+-------+---------------+
|         5.1|        3.5|         1.4|        0.2| setosa|           15.1|
|         4.9|        3.0|         1.4|        0.2| setosa|           14.9|
|         4.7|        3.2|         1.3|        0.2| setosa|           14.7|
|         4.6|        3.1|         1.5|        0.2| setosa|           14.6|
|         5.0|        3.6|         1.4|        0.2| setosa|           15.0|
+------------+-----------+------------+-----------+-------+---------------+
only showing top 5 rows


In [60]:
df.filter(df['species'] == 'setosa').select('sepal_length', 'sepal_width').show(5)

+------------+-----------+
|sepal_length|sepal_width|
+------------+-----------+
|         5.1|        3.5|
|         4.9|        3.0|
|         4.7|        3.2|
|         4.6|        3.1|
|         5.0|        3.6|
+------------+-----------+
only showing top 5 rows


In [61]:
df.filter(df['sepal_length'] > 5.0).filter(df['sepal_width'] < 3.5).show(5)

+------------+-----------+------------+-----------+-------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|
+------------+-----------+------------+-----------+-------+---------------+
|         5.4|        3.4|         1.7|        0.2| setosa|           15.4|
|         5.1|        3.3|         1.7|        0.5| setosa|           15.1|
|         5.2|        3.4|         1.4|        0.2| setosa|           15.2|
|         5.4|        3.4|         1.5|        0.4| setosa|           15.4|
|         5.1|        3.4|         1.5|        0.2| setosa|           15.1|
+------------+-----------+------------+-----------+-------+---------------+
only showing top 5 rows


In [62]:
df.filter((df['sepal_length']>5)&(df['sepal_width']<3.5)).show(5)  # Just to have a place to end

+------------+-----------+------------+-----------+-------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|
+------------+-----------+------------+-----------+-------+---------------+
|         5.4|        3.4|         1.7|        0.2| setosa|           15.4|
|         5.1|        3.3|         1.7|        0.5| setosa|           15.1|
|         5.2|        3.4|         1.4|        0.2| setosa|           15.2|
|         5.4|        3.4|         1.5|        0.4| setosa|           15.4|
|         5.1|        3.4|         1.5|        0.2| setosa|           15.1|
+------------+-----------+------------+-----------+-------+---------------+
only showing top 5 rows


In [63]:
df.filter((df['sepal_length']>5)|(df['sepal_width']<3.5)).show(5)

+------------+-----------+------------+-----------+-------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|
+------------+-----------+------------+-----------+-------+---------------+
|         5.1|        3.5|         1.4|        0.2| setosa|           15.1|
|         4.9|        3.0|         1.4|        0.2| setosa|           14.9|
|         4.7|        3.2|         1.3|        0.2| setosa|           14.7|
|         4.6|        3.1|         1.5|        0.2| setosa|           14.6|
|         5.4|        3.9|         1.7|        0.4| setosa|           15.4|
+------------+-----------+------------+-----------+-------+---------------+
only showing top 5 rows


In [64]:
df.select("species").distinct().show(), df.select("species").distinct().count()

+----------+
|   species|
+----------+
| virginica|
|versicolor|
|    setosa|
+----------+



(None, 3)

# Grouping and aggregations examples

In [35]:
df.groupBy("species").count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



In [37]:
df.sample(fraction=0.5, seed=42).groupBy("species").count().orderBy("count", ascending=False).show()

+----------+-----+
|   species|count|
+----------+-----+
|versicolor|   29|
| virginica|   26|
|    setosa|   17|
+----------+-----+



In [38]:
df.groupBy("species").mean().show()

+----------+-----------------+------------------+------------------+------------------+--------------------+
|   species|avg(sepal_length)|  avg(sepal_width)| avg(petal_length)|  avg(petal_width)|avg(sepal_length_10)|
+----------+-----------------+------------------+------------------+------------------+--------------------+
| virginica|6.587999999999998|2.9739999999999998|             5.552|             2.026|  16.587999999999997|
|versicolor|            5.936|2.7700000000000005|              4.26|1.3259999999999998|  15.936000000000003|
|    setosa|5.005999999999999| 3.428000000000001|1.4620000000000002|0.2459999999999999|  15.006000000000002|
+----------+-----------------+------------------+------------------+------------------+--------------------+



In [39]:
df.groupBy("species").sum().show()

+----------+------------------+------------------+------------------+------------------+--------------------+
|   species| sum(sepal_length)|  sum(sepal_width)| sum(petal_length)|  sum(petal_width)|sum(sepal_length_10)|
+----------+------------------+------------------+------------------+------------------+--------------------+
| virginica| 329.3999999999999|             148.7|277.59999999999997|101.29999999999998|   829.3999999999999|
|versicolor|             296.8|138.50000000000003|212.99999999999997|              66.3|   796.8000000000002|
|    setosa|250.29999999999998|171.40000000000003| 73.10000000000001|12.299999999999995|   750.3000000000001|
+----------+------------------+------------------+------------------+------------------+--------------------+



In [40]:
df.groupBy("species").max().show()

+----------+-----------------+----------------+-----------------+----------------+--------------------+
|   species|max(sepal_length)|max(sepal_width)|max(petal_length)|max(petal_width)|max(sepal_length_10)|
+----------+-----------------+----------------+-----------------+----------------+--------------------+
| virginica|              7.9|             3.8|              6.9|             2.5|                17.9|
|versicolor|              7.0|             3.4|              5.1|             1.8|                17.0|
|    setosa|              5.8|             4.4|              1.9|             0.6|                15.8|
+----------+-----------------+----------------+-----------------+----------------+--------------------+



In [41]:
df.groupBy("species").min().show()

+----------+-----------------+----------------+-----------------+----------------+--------------------+
|   species|min(sepal_length)|min(sepal_width)|min(petal_length)|min(petal_width)|min(sepal_length_10)|
+----------+-----------------+----------------+-----------------+----------------+--------------------+
| virginica|              4.9|             2.2|              4.5|             1.4|                14.9|
|versicolor|              4.9|             2.0|              3.0|             1.0|                14.9|
|    setosa|              4.3|             2.3|              1.0|             0.1|                14.3|
+----------+-----------------+----------------+-----------------+----------------+--------------------+



In [None]:
# Aggregations with multiple functions on multiple columns

In [43]:
df.groupBy("species").agg({'sepal_length': 'avg', 'sepal_width': 'max', 'petal_width':'sum'}).show()

+----------+----------------+------------------+-----------------+
|   species|max(sepal_width)|  sum(petal_width)|avg(sepal_length)|
+----------+----------------+------------------+-----------------+
| virginica|             3.8|101.29999999999998|6.587999999999998|
|versicolor|             3.4|              66.3|            5.936|
|    setosa|             4.4|12.299999999999995|5.005999999999999|
+----------+----------------+------------------+-----------------+



# User-Defined Functions (UDFs)

In [69]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [70]:
def categorize_sepal_length(length):
    if length < 5.0:
        return 'short'
    elif 5.0 <= length < 6.5:
        return 'medium'
    else:
        return 'long'

In [71]:
categorize_sepal_length_udf = udf(categorize_sepal_length, StringType())

In [72]:
df.withColumn('sepal_length_category', categorize_sepal_length_udf(df['sepal_length'])).show(5)

+------------+-----------+------------+-----------+-------+---------------+---------------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|sepal_length_category|
+------------+-----------+------------+-----------+-------+---------------+---------------------+
|         5.1|        3.5|         1.4|        0.2| setosa|           15.1|               medium|
|         4.9|        3.0|         1.4|        0.2| setosa|           14.9|                short|
|         4.7|        3.2|         1.3|        0.2| setosa|           14.7|                short|
|         4.6|        3.1|         1.5|        0.2| setosa|           14.6|                short|
|         5.0|        3.6|         1.4|        0.2| setosa|           15.0|               medium|
+------------+-----------+------------+-----------+-------+---------------+---------------------+
only showing top 5 rows


In [77]:
petal_length_category_udf = udf(lambda x: 'short' if x <1.4 else 'long', StringType())

In [78]:
df.withColumn('petal_length_category', petal_length_category_udf(df['petal_length'])).show(5)

+------------+-----------+------------+-----------+-------+---------------+---------------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|petal_length_category|
+------------+-----------+------------+-----------+-------+---------------+---------------------+
|         5.1|        3.5|         1.4|        0.2| setosa|           15.1|                 long|
|         4.9|        3.0|         1.4|        0.2| setosa|           14.9|                 long|
|         4.7|        3.2|         1.3|        0.2| setosa|           14.7|                short|
|         4.6|        3.1|         1.5|        0.2| setosa|           14.6|                 long|
|         5.0|        3.6|         1.4|        0.2| setosa|           15.0|                 long|
+------------+-----------+------------+-----------+-------+---------------+---------------------+
only showing top 5 rows


## Pandas UDFs

In [86]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType

In [80]:
def remaining_sepal_width(sepal_width):
    sepal_width_remaining = (4.5 - sepal_width)
    return sepal_width_remaining

In [83]:
remaining_sepal_width_udf = pandas_udf(remaining_sepal_width, IntegerType())

In [84]:
df.withColumn('remaining_sepal_width', remaining_sepal_width_udf(df['sepal_width'])).show()

+------------+-----------+------------+-----------+-------+---------------+---------------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|remaining_sepal_width|
+------------+-----------+------------+-----------+-------+---------------+---------------------+
|         5.1|        3.5|         1.4|        0.2| setosa|           15.1|                    1|
|         4.9|        3.0|         1.4|        0.2| setosa|           14.9|                    1|
|         4.7|        3.2|         1.3|        0.2| setosa|           14.7|                    1|
|         4.6|        3.1|         1.5|        0.2| setosa|           14.6|                    1|
|         5.0|        3.6|         1.4|        0.2| setosa|           15.0|                    0|
|         5.4|        3.9|         1.7|        0.4| setosa|           15.4|                    0|
|         4.6|        3.4|         1.4|        0.3| setosa|           14.6|                    1|
|         5.0|      

In [85]:
def petal_area_approximation(petal_length, petal_width):
    x = petal_length * petal_width
    return x

In [87]:
prod_udf = pandas_udf(petal_area_approximation, DoubleType())

In [88]:
df.withColumn('petal_area_approximation', prod_udf(df['petal_length'], df['petal_width'])).show()

+------------+-----------+------------+-----------+-------+---------------+------------------------+
|sepal_length|sepal_width|petal_length|petal_width|species|sepal_length_10|petal_area_approximation|
+------------+-----------+------------+-----------+-------+---------------+------------------------+
|         5.1|        3.5|         1.4|        0.2| setosa|           15.1|     0.27999999999999997|
|         4.9|        3.0|         1.4|        0.2| setosa|           14.9|     0.27999999999999997|
|         4.7|        3.2|         1.3|        0.2| setosa|           14.7|                    0.26|
|         4.6|        3.1|         1.5|        0.2| setosa|           14.6|     0.30000000000000004|
|         5.0|        3.6|         1.4|        0.2| setosa|           15.0|     0.27999999999999997|
|         5.4|        3.9|         1.7|        0.4| setosa|           15.4|                    0.68|
|         4.6|        3.4|         1.4|        0.3| setosa|           14.6|                

# Data Manipulation and Cleaning

In [89]:
df.count()

150

In [91]:
df = df.dropDuplicates()

In [92]:
df.count()

149

In [94]:
df_new = df.drop('sepal_length_10')
df_new.show(5)

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         5.9|        3.2|         4.8|        1.8|versicolor|
|         7.2|        3.2|         6.0|        1.8| virginica|
|         4.8|        3.4|         1.9|        0.2|    setosa|
|         6.0|        2.2|         5.0|        1.5| virginica|
|         6.9|        3.2|         5.7|        2.3| virginica|
+------------+-----------+------------+-----------+----------+
only showing top 5 rows


In [96]:
pwd

'/home/jovyan'

In [99]:
write_path = '/home/jovyan/df_csv'
df.coalesce(1).write.format('csv').option('header', 'true').save(write_path)

In [104]:
ls ../..

[0m[01;36mbin[0m@   [01;34mdev[0m/  [01;34mhome[0m/  [01;36mlib64[0m@  [01;34mmnt[0m/  [01;34mproc[0m/     [01;34mroot[0m/  [01;36msbin[0m@  [01;34msys[0m/  [01;34musr[0m/
[01;34mboot[0m/  [01;34metc[0m/  [01;36mlib[0m@   [01;34mmedia[0m/  [01;34mopt[0m/  [34;42mproject[0m/  [01;34mrun[0m/   [01;34msrv[0m/   [30;42mtmp[0m/  [01;34mvar[0m/


In [105]:
parquet_uri='/home/jovyan/work/df_parquet'
df.write.format('parquet').save(parquet_uri)