In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Start spark session

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()

# Load data 

In [3]:
df_flights_2012 = spark\
 .read\
 .option("inferSchema", "true")\
 .option("header", "true")\
 .csv("./data/2012-summary.csv")

df_flights_2012.createOrReplaceTempView("view_flight_2012")

# Column operations

In [4]:
df_flights_2012.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [5]:
spark.sql("SELECT DEST_COUNTRY_NAME FROM view_flight_2012 LIMIT 2").show()

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+



In [6]:
df_flights_2012.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Croatia|
|    United States|            Ireland|
+-----------------+-------------------+
only showing top 2 rows



In [7]:
# SQL?
spark.sql("SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM view_flight_2012 LIMIT 2").show()

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Croatia|
|    United States|            Ireland|
+-----------------+-------------------+



Using expressions to columns

In [8]:
from pyspark.sql.functions import expr, col, column

df_flights_2012.select(
 expr("DEST_COUNTRY_NAME as End_Country"))\
 .show(2)

+-------------+
|  End_Country|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



Further manipulate the results using `alias`

In [9]:
df_flights_2012.select(expr("DEST_COUNTRY_NAME")\
                       .alias("End_Country"))\
                        .show(2)

+-------------+
|  End_Country|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



Complex expression as far as they are non-aggregating

In [10]:
df_flights_2012.selectExpr(
 "*", # all original columns
 "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
 .show()

+--------------------+-------------------+-----+-------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+--------------------+-------------------+-----+-------------+
|       United States|            Croatia|    1|        false|
|       United States|            Ireland|  252|        false|
|               Egypt|      United States|   13|        false|
|       United States|              India|   62|        false|
|       United States|          Singapore|   25|        false|
|       United States|            Grenada|   46|        false|
|          Costa Rica|      United States|  522|        false|
|             Senegal|      United States|   31|        false|
|              Guyana|      United States|   65|        false|
|       United States|   Marshall Islands|   30|        false|
|       United States|       Sint Maarten|  245|        false|
|             Bolivia|      United States|   35|        false|
|            Anguilla|      United States|   19|       

We can also use SQL like aggregation functions

In [11]:
df_flights_2012.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+------------------+---------------------------------+
|        avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+------------------+---------------------------------+
|1723.1836734693877|                              127|
+------------------+---------------------------------+



Using Literals for Spark expressions

In [12]:
from pyspark.sql.functions import lit, expr

df_flights_2012.select(expr("*"), lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Croatia|    1|  1|
|    United States|            Ireland|  252|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



Adding columns

In [13]:
df_flights_2012.withColumn("NewNumer", lit(3)).show(5)

+-----------------+-------------------+-----+--------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|NewNumer|
+-----------------+-------------------+-----+--------+
|    United States|            Croatia|    1|       3|
|    United States|            Ireland|  252|       3|
|            Egypt|      United States|   13|       3|
|    United States|              India|   62|       3|
|    United States|          Singapore|   25|       3|
+-----------------+-------------------+-----+--------+
only showing top 5 rows



Comparing columns

In [14]:
df_flights_2012.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
 .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  252|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



Renaming columns??

In [15]:
df_flights_2012.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'Destination']

Renaming columns

In [16]:
df_flights_2012.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

# Row operations

Filtering rows

In [17]:
df_flights_2012.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|               Togo|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [18]:
df_flights_2012.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
 .show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|               Togo|    1|
|    United States|    Solomon Islands|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



Getting unique rows

In [19]:
df_flights_2012.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

245

Choosing random samples

In [20]:
df_flights_2012.sample(withReplacement = False, 
                       fraction = 0.5, 
                       seed = 5).show()

+--------------------+--------------------+-----+
|   DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+--------------------+--------------------+-----+
|       United States|             Croatia|    1|
|       United States|             Ireland|  252|
|       United States|           Singapore|   25|
|       United States|             Grenada|   46|
|          Costa Rica|       United States|  522|
|              Guyana|       United States|   65|
|       United States|    Marshall Islands|   30|
|             Bolivia|       United States|   35|
|            Anguilla|       United States|   19|
|Turks and Caicos ...|       United States|  183|
|Saint Vincent and...|       United States|    6|
|       United States|              Russia|  148|
|       United States|Federated States ...|   63|
|       United States|         Netherlands|  607|
|             Iceland|       United States|  137|
|    Marshall Islands|       United States|   60|
|          Luxembourg|       United States|  111|


Random splits

In [22]:
dataFrames = df_flights_2012.randomSplit([0.25, 0.75], seed=5)

dataFrames[0].count() > dataFrames[1].count() # False

False

Concatenating and Appending Rows (Union)

In [23]:
from pyspark.sql import Row
from pyspark.sql.functions import col

schema = df_flights_2012.schema
newRows = [Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)]

parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

# in Python
df_flights_2012.union(newDF)\
 .where("count = 1")\
 .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
 .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|               Togo|    1|
|    United States|    Solomon Islands|    1|
|    United States|         The Gambia|    1|
|    United States|            Tunisia|    1|
|    United States|         Azerbaijan|    1|
|    United States|           Cambodia|    1|
|    United States|          Greenland|    1|
|    United States|      French Guiana|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



Sorting rows

In [24]:
df_flights_2012.sort("count").show(5)

df_flights_2012.orderBy("count", "DEST_COUNTRY_NAME").show(5)

df_flights_2012.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Kazakhstan|      United States|    1|
|    United States|         Azerbaijan|    1|
|    United States|         The Gambia|    1|
|    United States|    Solomon Islands|    1|
|    United States|               Togo|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Belarus|      United States|    1|
|         Cameroon|      United States|    1|
|          Croatia|      United States|    1|
|        Greenland|      United States|    1|
|       Kazakhstan|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-----------

Limited results

In [25]:
df_flights_2012.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|            Ireland|  252|
|            Egypt|      United States|   13|
|    United States|              India|   62|
|    United States|          Singapore|   25|
+-----------------+-------------------+-----+



In [26]:
df_flights_2012.orderBy(expr("count desc")).limit(6).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Kazakhstan|      United States|    1|
|    United States|         Azerbaijan|    1|
|    United States|         The Gambia|    1|
|    United States|    Solomon Islands|    1|
|    United States|               Togo|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



Repartition and Coalesce

In [27]:
df_flights_2012.rdd.getNumPartitions()

1

In [28]:
df_flights_2012.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [29]:
df_flights_2012.repartition(4, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

> Note: Try out examples from Chapter 5 on a different dataset.