# Hands-on with Spark - Google Dataproc

![spark](https://cdn-images-1.medium.com/max/300/1*c8CtvqKJDVUnMoPGujF5fA.png)

PySpark is the Python API of Spark; which means it can do almost all the things python can- Machine learning (ML), exploratory data analysis (EDA), ETLs for data platform. And all of them in a distributed manner.

![pyspark](https://editor.analyticsvidhya.com/uploads/20981sp3.JPG)

In simple terms, each time you submit a PySpark job, the code gets internally converted into a MapReduce program and gets executed in the Java Virtual Machine. Spark also uses Lazy Evaluation, it delays its evaluation as much as it can. Each time you submit a job, spark creates an action plan for how to execute the code, and then does nothing. Finally, when you ask for the result (i.e, calls an action), it executes the plan, which is basically all the transofrmations you have mentioned in your code.


For our lesson, we will be running Spark on **local** mode.

In this mode, Spark runs on a single machine, utilizing available cores (similar to Polars).
It's useful for development, learning, testing, and debugging since everything runs on the local machine without needing a cluster.

In actual production environments, Spark is usually run on a cluster of machines (YARN, Mesos or Kubernetes mode).

## Initializing Spark

# For mult-node spark, call Spark session with Yarn 

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DistributedSparkApp") \
    .master("yarn") \
    .config("spark.executor.instances", "3") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

25/12/08 11:18:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Invoking `spark` will print the SparkContext

In [2]:
spark

## Spark SQL and DataFrames

Spark SQL and DataFrames are higher-level modules of Apache Spark, they work together to make data processing with Spark more intuitive and optimized, especially for those who come from an SQL or data analytics background.

- **SQL Interface to Spark**: Spark SQL lets users execute SQL queries alongside Spark programs.

- **DataFrames are abstraction over RDDs**: A DataFrame is a distributed collection of data organized into named columns. Conceptually, it's equivalent to a table in a relational database or a data frame in R or Python, but with optimizations for distributed processing and scalability. While RDD (Resilient Distributed Dataset) is a fundamental data structure in Spark, DataFrames provide a higher-level abstraction that is often easier to use and more optimized for many tasks.

### DataFrame Creation

In [3]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

In [4]:
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

PySpark infers the DataFrame schema (dtypes) by taking a sample from the data (just like Pandas)

In [5]:
df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



                                                                                


You can also pass an explicit schema:

In [6]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

PySpark DataFrame is lazily evaluated and invoking `df` does not trigger the computation and show anything. You need to explicitly call the `show` method:

In [7]:
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



### Reading data into spark

There are many data sources available in PySpark such as CSV, Parquet, ORC, JDBC, text, Avro, etc. We can read these files into Spark.

In [8]:
df = spark.read.parquet('gs://su-artifacts/userdata1.parquet')
df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male| 218.111.175.34|                   |              Canada| 1/16/1968|150280.17|       Accountant IV|                    |
|2016-02-03 01:09:31|  3|

                                                                                

In [9]:
df = spark.read.orc('gs://su-artifacts/userdata1.orc')
df.show()

[Stage 6:>                                                          (0 + 1) / 1]

+-------------------+-----+--------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+------+
|              _col0|_col1|   _col2|    _col3|               _col4| _col5|          _col6|           _col7|               _col8|     _col9|   _col10|              _col11|_col12|
+-------------------+-----+--------+---------+--------------------+------+---------------+----------------+--------------------+----------+---------+--------------------+------+
|2016-02-03 13:36:39|    1|  Donald|    Lewis|dlewis0@clickbank...|  Male|  102.22.124.20|                |           Indonesia|  7/9/1972|140249.37|Senior Financial ...|      |
|2016-02-03 00:22:28|    2|  Walter|  Collins|wcollins1@bloglov...|  Male|   247.28.26.93|3587726269478025|               China|          |     null|                    |      |
|2016-02-03 18:29:04|    3|Michelle|Henderson|mhenderson2@geoci...|Female| 193.68.146.150|                |   

                                                                                

In [10]:
movies = spark.read.csv('gs://su-artifacts/movies_metadata.csv', header=True)
ratings = spark.read.csv('gs://su-artifacts/ratings.csv', header=True)
taxi = spark.read.csv('gs://su-artifacts/taxi_trip_data.csv', header=True)

                                                                                

In [11]:
movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [12]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [13]:
taxi.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- imp_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)



In [14]:
movies = spark.read.csv('gs://su-artifacts/movies_metadata.csv', header=True, inferSchema=True, quote="\"", escape="\"")

                                                                                

In [15]:
movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [16]:
ratings = spark.read.csv('gs://su-artifacts/ratings.csv', header=True, inferSchema=True)

                                                                                

In [17]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



Passing a schema will speed up the read.

In [18]:
dtypes = ['vendor_id integer',
 'pickup_datetime timestamp',
 'dropoff_datetime timestamp',
 'passenger_count integer',
 'trip_distance double',
 'rate_code integer',
 'store_and_fwd_flag string',
 'payment_type integer',
 'fare_amount double',
 'extra double',
 'mta_tax double',
 'tip_amount double',
 'tolls_amount double',
 'imp_surcharge double',
 'total_amount double',
 'pickup_location_id integer',
 'dropoff_location_id integer']

In [19]:
taxi = spark.read.csv('gs://su-artifacts/taxi_trip_data.csv', header=True,
                      schema=', '.join(dtypes))

In [20]:
taxi.printSchema()

root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- imp_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)



There are a couple of ways to view your dataframe in PySpark:

1.   `df.take(5)` will return a list of five Row objects.
2.   `df.collect()` will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node.
3.   `df.show()` is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, `df.show(5, False)` or ` df.show(5, truncate=False)` will show the entire data wihtout any truncation.
4.   `df.limit(5)` will **return a new DataFrame** by taking the first n rows. As spark is distributed in nature, there is no guarantee that `df.limit()` will give you the same results each time.

In [21]:
movies.show(10)

[Stage 14:>                                                         (0 + 1) / 1]

+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|belongs_to_collection|  budget|              genres|            homepage|   id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+-----------

                                                                                

In [22]:
movies.show(5, truncate=False)

[Stage 15:>                                                         (0 + 1) / 1]

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-------------------------------------------------------------------------------------------------+------------------------------------+-----+---------+-----------------+---------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------

                                                                                

In [23]:
# can also be shown vertically
movies.show(1, vertical=True, truncate=False)

[Stage 16:>                                                         (0 + 1) / 1]

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 adult                 | False                                                                                                                                                                                                                                                                                                           
 belongs_to_collection | {'id': 10194, 'name': 'Toy Story Collection', 'poster_path': '/7G9915LfUQ2lVfwMEEhDsn3kT4B.jpg', 'backdrop_path': '/9FBwqcd9IRruEDUrTdcaafOMKUq.jpg'}                                                                                                                                                           
 budget   

                                                                                

In [24]:
ratings.show(10)

[Stage 17:>                                                         (0 + 1) / 1]

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
|     1|   1968|   4.0|1425942148|
|     1|   2762|   4.5|1425941300|
|     1|   2918|   5.0|1425941593|
|     1|   2959|   4.0|1425941601|
|     1|   4226|   4.0|1425942228|
+------+-------+------+----------+
only showing top 10 rows



                                                                                

In [25]:
taxi.show(10)

[Stage 18:>                                                         (0 + 1) / 1]

+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+
|        2|2018-03-29 13:37:13|2018-03-29 14:17:01|              1|        18.15|        3|                 N|           1|       70.0|  0.0|    0.0|     16.16|        10.5|          0.3|       96.96|               161|                  1|
|        2|2018-03-29 13:37:18|2018-03-2

                                                                                

In [26]:
taxi.limit(5).show()



+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+
|        2|2018-03-29 13:37:13|2018-03-29 14:17:01|              1|        18.15|        3|                 N|           1|       70.0|  0.0|    0.0|     16.16|        10.5|          0.3|       96.96|               161|                  1|
|        2|2018-03-29 13:37:18|2018-03-2

                                                                                

In [27]:
movies.describe().show()

                                                                                

+-------+-------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-----+--------------------+------------------+
|summary|              adult|belongs_to_collection|              budget|              genres|            homepage|                  id|             imdb_id|original_language|                     original_title|            overview|          popularity|         poster_path|    production_companies|production_countries|        release_date|             revenue|             runtime|    spoken_languages|            status|           tagline|               title|video|        vote_average|  

In [28]:
ratings.describe().show()



+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|           timestamp|
+-------+-----------------+------------------+------------------+--------------------+
|  count|         26024289|          26024289|          26024289|            26024289|
|   mean| 135037.090248114|15849.109677040553|3.5280903543608817|1.1712584326913223E9|
| stddev|78176.19722170963| 31085.25753139151|1.0654427636662405|2.0528887028185263E8|
|    min|                1|                 1|               0.5|           789652004|
|    max|           270896|            176275|               5.0|          1501829870|
+-------+-----------------+------------------+------------------+--------------------+



                                                                                

In [51]:
taxi.describe().show()

                                                                                

+-------+------------------+------------------+-----------------+------------------+------------------+-------------------+------------------+------------------+-------------------+-----------------+------------------+--------------------+------------------+------------------+-------------------+------------------+
|summary|         vendor_id|   passenger_count|    trip_distance|         rate_code|store_and_fwd_flag|       payment_type|       fare_amount|             extra|            mta_tax|       tip_amount|      tolls_amount|       imp_surcharge|      total_amount|pickup_location_id|dropoff_location_id|     trip_duration|
+-------+------------------+------------------+-----------------+------------------+------------------+-------------------+------------------+------------------+-------------------+-----------------+------------------+--------------------+------------------+------------------+-------------------+------------------+
|  count|          10000000|          10000000|  

### DataFrame Operations on Columns

We will go over the following in this section:

1.   Selecting a Column
2.   Selecting Multiple Columns
3.   Adding New Columns
4.   Renaming Columns
5.   Removing Columns


In [30]:
movies.id

Column<'id'>

Just like DataFrame, selecting a column is lazily evaluated and does not trigger the computation but returns a `Column` instance.

It can be used to select columns and returns another DataFrame:

In [31]:
movies.select(movies.id).show()

[Stage 31:>                                                         (0 + 1) / 1]

+-----+
|   id|
+-----+
|  862|
| 8844|
|15602|
|31357|
|11862|
|  949|
|11860|
|45325|
| 9091|
|  710|
| 9087|
|12110|
|21032|
|10858|
| 1408|
|  524|
| 4584|
|    5|
| 9273|
|11517|
+-----+
only showing top 20 rows



                                                                                

In [32]:
movies.select(movies.title, movies.overview).show(truncate=False)

[Stage 32:>                                                         (0 + 1) / 1]

+------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                         |overview                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+------------------------------+----

                                                                                

Creating a new column:

In [33]:
from pyspark.sql.functions import lit

In [34]:
# lit means literal. It populates the row with the literal value given
ratings.withColumn('review', lit('Great movie!')).show(10)

[Stage 33:>                                                         (0 + 1) / 1]

+------+-------+------+----------+------------+
|userId|movieId|rating| timestamp|      review|
+------+-------+------+----------+------------+
|     1|    110|   1.0|1425941529|Great movie!|
|     1|    147|   4.5|1425942435|Great movie!|
|     1|    858|   5.0|1425941523|Great movie!|
|     1|   1221|   5.0|1425941546|Great movie!|
|     1|   1246|   5.0|1425941556|Great movie!|
|     1|   1968|   4.0|1425942148|Great movie!|
|     1|   2762|   4.5|1425941300|Great movie!|
|     1|   2918|   5.0|1425941593|Great movie!|
|     1|   2959|   4.0|1425941601|Great movie!|
|     1|   4226|   4.0|1425942228|Great movie!|
+------+-------+------+----------+------------+
only showing top 10 rows



                                                                                

In [35]:
ratings.withColumn('review', lit('Great movie!')) \
       .withColumn('mood', lit(5)) \
       .show(10)

[Stage 34:>                                                         (0 + 1) / 1]

+------+-------+------+----------+------------+----+
|userId|movieId|rating| timestamp|      review|mood|
+------+-------+------+----------+------------+----+
|     1|    110|   1.0|1425941529|Great movie!|   5|
|     1|    147|   4.5|1425942435|Great movie!|   5|
|     1|    858|   5.0|1425941523|Great movie!|   5|
|     1|   1221|   5.0|1425941546|Great movie!|   5|
|     1|   1246|   5.0|1425941556|Great movie!|   5|
|     1|   1968|   4.0|1425942148|Great movie!|   5|
|     1|   2762|   4.5|1425941300|Great movie!|   5|
|     1|   2918|   5.0|1425941593|Great movie!|   5|
|     1|   2959|   4.0|1425941601|Great movie!|   5|
|     1|   4226|   4.0|1425942228|Great movie!|   5|
+------+-------+------+----------+------------+----+
only showing top 10 rows



                                                                                

In [36]:
movies.withColumnRenamed('overview', 'summary').show(5)

[Stage 35:>                                                         (0 + 1) / 1]

+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|belongs_to_collection|  budget|              genres|            homepage|   id|  imdb_id|original_language|      original_title|             summary|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+-----------

                                                                                

In [37]:
ratings.drop('timestamp').show()

[Stage 36:>                                                         (0 + 1) / 1]

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    110|   1.0|
|     1|    147|   4.5|
|     1|    858|   5.0|
|     1|   1221|   5.0|
|     1|   1246|   5.0|
|     1|   1968|   4.0|
|     1|   2762|   4.5|
|     1|   2918|   5.0|
|     1|   2959|   4.0|
|     1|   4226|   4.0|
|     1|   4878|   5.0|
|     1|   5577|   5.0|
|     1|  33794|   4.0|
|     1|  54503|   3.5|
|     1|  58559|   4.0|
|     1|  59315|   5.0|
|     1|  68358|   5.0|
|     1|  69844|   5.0|
|     1|  73017|   5.0|
|     1|  81834|   5.0|
+------+-------+------+
only showing top 20 rows



                                                                                

### Common Transformation Functions

In [38]:
# Functions available in PySpark
from pyspark.sql import functions
# We can use the dir function to view the available functions
print(dir(functions))



String functions:

In [39]:
from pyspark.sql.functions import concat, lower, upper, substring

In [40]:
movies.show(5)

[Stage 37:>                                                         (0 + 1) / 1]

+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|belongs_to_collection|  budget|              genres|            homepage|   id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|  revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+---------+-------+-----------

                                                                                

In [41]:
movies.select(movies.title, movies.tagline, upper(movies.title), lower(movies.tagline),
              substring(movies.overview, 1, 10),
              concat(movies.title, lit(' Part 1')).alias('new_title')
              ).show()

[Stage 38:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------------+--------------------+
|               title|             tagline|        upper(title)|      lower(tagline)|substring(overview, 1, 10)|           new_title|
+--------------------+--------------------+--------------------+--------------------+--------------------------+--------------------+
|           Toy Story|                null|           TOY STORY|                null|                Led by Woo|    Toy Story Part 1|
|             Jumanji|Roll the dice and...|             JUMANJI|roll the dice and...|                When sibli|      Jumanji Part 1|
|    Grumpier Old Men|Still Yelling. St...|    GRUMPIER OLD MEN|still yelling. st...|                A family w|Grumpier Old Men ...|
|   Waiting to Exhale|Friends are the p...|   WAITING TO EXHALE|friends are the p...|                Cheated on|Waiting to Exhale...|
|Father of the Bri...|Just When His Wor...|FATHER OF THE BRI..

                                                                                

Numeric functions:

In [42]:
from pyspark.sql.functions import mean, max, round

In [43]:
ratings.select(mean(ratings.rating), max(ratings.rating)).show()



+------------------+-----------+
|       avg(rating)|max(rating)|
+------------------+-----------+
|3.5280903543608817|        5.0|
+------------------+-----------+



                                                                                

In [44]:
ratings.withColumn('new_rating', round(ratings.rating)).show()

[Stage 42:>                                                         (0 + 1) / 1]

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|new_rating|
+------+-------+------+----------+----------+
|     1|    110|   1.0|1425941529|       1.0|
|     1|    147|   4.5|1425942435|       5.0|
|     1|    858|   5.0|1425941523|       5.0|
|     1|   1221|   5.0|1425941546|       5.0|
|     1|   1246|   5.0|1425941556|       5.0|
|     1|   1968|   4.0|1425942148|       4.0|
|     1|   2762|   4.5|1425941300|       5.0|
|     1|   2918|   5.0|1425941593|       5.0|
|     1|   2959|   4.0|1425941601|       4.0|
|     1|   4226|   4.0|1425942228|       4.0|
|     1|   4878|   5.0|1425941434|       5.0|
|     1|   5577|   5.0|1425941397|       5.0|
|     1|  33794|   4.0|1425942005|       4.0|
|     1|  54503|   3.5|1425941313|       4.0|
|     1|  58559|   4.0|1425942007|       4.0|
|     1|  59315|   5.0|1425941502|       5.0|
|     1|  68358|   5.0|1425941464|       5.0|
|     1|  69844|   5.0|1425942139|       5.0|
|     1|  73017|   5.0|1425942699|

                                                                                

Date/time functions:

In [45]:
from pyspark.sql.functions import to_date, to_timestamp, datediff

In [46]:
ratings.withColumn('new_timestamp', to_timestamp(ratings.timestamp)).show()

[Stage 43:>                                                         (0 + 1) / 1]

+------+-------+------+----------+-------------------+
|userId|movieId|rating| timestamp|      new_timestamp|
+------+-------+------+----------+-------------------+
|     1|    110|   1.0|1425941529|2015-03-09 22:52:09|
|     1|    147|   4.5|1425942435|2015-03-09 23:07:15|
|     1|    858|   5.0|1425941523|2015-03-09 22:52:03|
|     1|   1221|   5.0|1425941546|2015-03-09 22:52:26|
|     1|   1246|   5.0|1425941556|2015-03-09 22:52:36|
|     1|   1968|   4.0|1425942148|2015-03-09 23:02:28|
|     1|   2762|   4.5|1425941300|2015-03-09 22:48:20|
|     1|   2918|   5.0|1425941593|2015-03-09 22:53:13|
|     1|   2959|   4.0|1425941601|2015-03-09 22:53:21|
|     1|   4226|   4.0|1425942228|2015-03-09 23:03:48|
|     1|   4878|   5.0|1425941434|2015-03-09 22:50:34|
|     1|   5577|   5.0|1425941397|2015-03-09 22:49:57|
|     1|  33794|   4.0|1425942005|2015-03-09 23:00:05|
|     1|  54503|   3.5|1425941313|2015-03-09 22:48:33|
|     1|  58559|   4.0|1425942007|2015-03-09 23:00:07|
|     1|  

                                                                                

Replace the unix timestamp with new timestamp:

In [47]:
ratings = ratings.withColumn('timestamp', to_timestamp(ratings.timestamp))

In [48]:
ratings.show()

[Stage 44:>                                                         (0 + 1) / 1]

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|    110|   1.0|2015-03-09 22:52:09|
|     1|    147|   4.5|2015-03-09 23:07:15|
|     1|    858|   5.0|2015-03-09 22:52:03|
|     1|   1221|   5.0|2015-03-09 22:52:26|
|     1|   1246|   5.0|2015-03-09 22:52:36|
|     1|   1968|   4.0|2015-03-09 23:02:28|
|     1|   2762|   4.5|2015-03-09 22:48:20|
|     1|   2918|   5.0|2015-03-09 22:53:13|
|     1|   2959|   4.0|2015-03-09 22:53:21|
|     1|   4226|   4.0|2015-03-09 23:03:48|
|     1|   4878|   5.0|2015-03-09 22:50:34|
|     1|   5577|   5.0|2015-03-09 22:49:57|
|     1|  33794|   4.0|2015-03-09 23:00:05|
|     1|  54503|   3.5|2015-03-09 22:48:33|
|     1|  58559|   4.0|2015-03-09 23:00:07|
|     1|  59315|   5.0|2015-03-09 22:51:42|
|     1|  68358|   5.0|2015-03-09 22:51:04|
|     1|  69844|   5.0|2015-03-09 23:02:19|
|     1|  73017|   5.0|2015-03-09 23:11:39|
|     1|  81834|   5.0|2015-03-0

                                                                                

Computing the difference in minutes is more tedious, we'll need to convert the timestamps to unix timestamps (seconds since epoch), compute the difference, and divide by 60.

In [52]:
taxi = taxi.withColumn('trip_duration', (taxi.dropoff_datetime.cast("long") - taxi.pickup_datetime.cast("long"))/60)

In [53]:
taxi.show(10)

[Stage 49:>                                                         (0 + 1) / 1]

+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|     trip_duration|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+------------------+
|        2|2018-03-29 13:37:13|2018-03-29 14:17:01|              1|        18.15|        3|                 N|           1|       70.0|  0.0|    0.0|     16.16|        10.5|          0.3|       96.96|               161|    

                                                                                

### User-Defined Functions (UDF)

PySpark User-Defined Functions (UDFs) help you convert your Python code into a scalable version of itself. It is handy, but beware, as the performance is slower compared to PySpark functions.

In [54]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

In [55]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return (series + 1.0).astype(float)

In [56]:
taxi.select(taxi.passenger_count, pandas_plus_one(taxi.passenger_count)).show()

[Stage 50:>                                                         (0 + 1) / 1]

+---------------+--------------------------------+
|passenger_count|pandas_plus_one(passenger_count)|
+---------------+--------------------------------+
|              1|                               2|
|              1|                               2|
|              1|                               2|
|              2|                               3|
|              5|                               6|
|              1|                               2|
|              1|                               2|
|              1|                               2|
|              1|                               2|
|              1|                               2|
|              1|                               2|
|              2|                               3|
|              1|                               2|
|              1|                               2|
|              1|                               2|
|              1|                               2|
|              1|              

                                                                                

### DataFrame Operations on Rows

We will show the follwoing in this section:

1.   Filtering Rows
2. 	 Get Distinct Rows
3.   Sorting Rows

In [57]:
movies.filter(movies.status == 'In Production').show()

[Stage 52:>                                                         (0 + 4) / 4]

+-----+---------------------+------+--------------------+--------------------+------+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+-------+--------------------+-------------+--------------------+--------------------+-----+------------+----------+
|adult|belongs_to_collection|budget|              genres|            homepage|    id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|revenue|runtime|    spoken_languages|       status|             tagline|               title|video|vote_average|vote_count|
+-----+---------------------+------+--------------------+--------------------+------+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+-------+-------+----------

                                                                                

In [58]:
ratings.filter(ratings.rating > 4.5).show()

[Stage 53:>                                                         (0 + 1) / 1]

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|    858|   5.0|2015-03-09 22:52:03|
|     1|   1221|   5.0|2015-03-09 22:52:26|
|     1|   1246|   5.0|2015-03-09 22:52:36|
|     1|   2918|   5.0|2015-03-09 22:53:13|
|     1|   4878|   5.0|2015-03-09 22:50:34|
|     1|   5577|   5.0|2015-03-09 22:49:57|
|     1|  59315|   5.0|2015-03-09 22:51:42|
|     1|  68358|   5.0|2015-03-09 22:51:04|
|     1|  69844|   5.0|2015-03-09 23:02:19|
|     1|  73017|   5.0|2015-03-09 23:11:39|
|     1|  81834|   5.0|2015-03-09 23:02:13|
|     1|  91542|   5.0|2015-03-09 23:10:18|
|     1|  92439|   5.0|2015-03-09 22:50:24|
|     1|  96821|   5.0|2015-03-09 22:49:42|
|     1| 112552|   5.0|2015-03-09 22:48:56|
|     2|    339|   5.0|1997-06-23 04:48:16|
|     2|   1356|   5.0|1997-06-23 04:14:48|
|     4|   1097|   5.0|2003-01-15 21:58:45|
|     4|   1221|   5.0|2003-01-15 21:58:23|
|     4|   2023|   5.0|2003-01-1

                                                                                

Use `&` and `|` for combining conditions as `and` and `or`:

In [59]:
ratings.filter((ratings.rating < 1.5) | (ratings.rating > 4.5)).show()

[Stage 54:>                                                         (0 + 1) / 1]

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|    110|   1.0|2015-03-09 22:52:09|
|     1|    858|   5.0|2015-03-09 22:52:03|
|     1|   1221|   5.0|2015-03-09 22:52:26|
|     1|   1246|   5.0|2015-03-09 22:52:36|
|     1|   2918|   5.0|2015-03-09 22:53:13|
|     1|   4878|   5.0|2015-03-09 22:50:34|
|     1|   5577|   5.0|2015-03-09 22:49:57|
|     1|  59315|   5.0|2015-03-09 22:51:42|
|     1|  68358|   5.0|2015-03-09 22:51:04|
|     1|  69844|   5.0|2015-03-09 23:02:19|
|     1|  73017|   5.0|2015-03-09 23:11:39|
|     1|  81834|   5.0|2015-03-09 23:02:13|
|     1|  91542|   5.0|2015-03-09 23:10:18|
|     1|  92439|   5.0|2015-03-09 22:50:24|
|     1|  96821|   5.0|2015-03-09 22:49:42|
|     1|  98809|   0.5|2015-03-09 23:10:40|
|     1| 112552|   5.0|2015-03-09 22:48:56|
|     2|    339|   5.0|1997-06-23 04:48:16|
|     2|    786|   1.0|1997-06-23 04:14:09|
|     2|    788|   1.0|1997-06-2

                                                                                

> 1. Filter `movies` for `status` to be `In Production` and `vote_average` greater than 6.
> 2. Filter `taxi` for `trip_duration` greater than 1 hour, `trip_distance` greater than 10 miles and `passenger_count` equal to and less than 2.

In [60]:
# 1. Filter movies for status 'In Production' and vote_average > 6
# This will show movies that are still in production and have a vote_average greater than 6.
# If this returns no records, it means there are no such movies in your dataset.
movies.filter(
    (movies.status == 'In Production') & (movies.vote_average > 6)
).show()

[Stage 56:>                                                         (0 + 4) / 4]

+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+--------------------+--------------------+------------+-------+-------+----------------+------+-------+-----+-----+------------+----------+
|adult|belongs_to_collection|budget|genres|homepage| id|imdb_id|original_language|original_title|overview|popularity|poster_path|production_companies|production_countries|release_date|revenue|runtime|spoken_languages|status|tagline|title|video|vote_average|vote_count|
+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+--------------------+--------------------+------------+-------+-------+----------------+------+-------+-----+-----+------------+----------+
+-----+---------------------+------+------+--------+---+-------+-----------------+--------------+--------+----------+-----------+--------------------+--------------------+------------+-------+-

                                                                                

In [61]:
# 2. Filter taxi for trip_duration > 1 hour, trip_distance > 10 miles, and passenger_count <= 2
# First, create the trip_duration column (in minutes) using pickup and dropoff timestamps.
from pyspark.sql.functions import col

taxi = taxi.withColumn(
    'trip_duration',
    (col('dropoff_datetime').cast('long') - col('pickup_datetime').cast('long')) / 60
)

# Now filter for trips longer than 1 hour, distance > 10 miles, and 2 or fewer passengers.
taxi.filter(
    (col('trip_duration') > 60) &
    (col('trip_distance') > 10) &
    (col('passenger_count') <= 2)
).show()

[Stage 57:>                                                         (0 + 1) / 1]

+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|     trip_duration|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+------------------+
|        2|2018-03-29 15:14:59|2018-03-29 16:42:46|              1|        16.98|        3|                 N|           1|       85.0|  0.0|    0.0|      15.0|        12.5|          0.3|       112.8|                87|    

                                                                                

In [62]:
movies.select('status').distinct().show()



+---------------+
|         status|
+---------------+
|              7|
|        Rumored|
|       Released|
|          False|
|           null|
|  In Production|
|            5.0|
|       Canceled|
|        Planned|
|Post Production|
|              0|
|          102.0|
|           80.0|
|              2|
|            5.3|
|              1|
|            5.5|
+---------------+



                                                                                

In [63]:
taxi.orderBy('trip_duration').show()



+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+-------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|      trip_duration|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+-------------------+
|        1|2018-02-11 15:15:01|2017-12-24 17:09:53|              2|          4.7|        1|                 N|           1|       25.0|  0.0|    0.5|      5.16|         0.0|          0.3|       30.96|               246| 

                                                                                

In [64]:
taxi.orderBy('trip_distance', ascending=False).show()



+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+-------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|      trip_duration|
+---------+-------------------+-------------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+-------------------+
|        2|2018-07-19 18:00:25|2018-07-23 10:54:16|              1|      7655.76|        1|                 N|           2|     9999.5|  1.0|    0.5|       0.0|         0.0|          0.3|     10001.3|               264| 

                                                                                

### Grouping Data

In [65]:
movies.groupBy('status').count().show()



+---------------+-----+
|         status|count|
+---------------+-----+
|        Rumored|  230|
|       Released|44941|
|              0|    2|
|           null|  254|
|  In Production|   20|
|Post Production|   98|
|          102.0|    1|
|        Planned|   15|
|            5.5|    1|
|           80.0|    1|
|              2|    1|
|              7|    1|
|          False|    1|
|       Canceled|    2|
|            5.3|    1|
|            5.0|    1|
|              1|    2|
+---------------+-----+



                                                                                

In [66]:
ratings.groupBy('movieId').avg('rating').show()



+-------+------------------+
|movieId|       avg(rating)|
+-------+------------------+
|   1580|3.5733178489322874|
|   2366|3.4740872335211956|
|   1591|2.6416020262782967|
|   3175| 3.586550320670942|
|   8638|3.9675026123301986|
|  44022| 3.294874689546173|
|   1645| 3.516589990241182|
|   1959|3.6369782971619364|
|   2122| 2.634513274336283|
|    463|2.8119158878504673|
|    471| 3.654817548175482|
|   1088| 3.239810636881426|
|   6658|  2.88480697384807|
|   7982| 3.607471264367816|
|  32460| 4.014592933947773|
|  36525|3.4823726916620035|
|  54190| 3.613386824324324|
|  68135| 3.097457627118644|
|  96488| 3.970890410958904|
| 135867|3.8857142857142857|
+-------+------------------+
only showing top 20 rows



                                                                                

### Joining DataFrames

In [67]:
ratings_avg_by_movie = ratings.groupBy('movieId').avg('rating')

In [68]:
ratings_avg_by_movie.columns

['movieId', 'avg(rating)']

In [None]:
movies.join(ratings_avg_by_movie, movies.id == ratings.movieId, 'inner').select('title', 'vote_average', 'avg(rating)').show()



### Spark SQL

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily:

In [None]:
taxi.createOrReplaceTempView("taxi")
spark.sql("SELECT count(*) from taxi").show()

In addition, UDFs can be registered and invoked in SQL out of the box:

In [None]:
@pandas_udf("double")
def div(s1: pd.Series, s2: pd.Series) -> pd.Series:
    return s1 / s2

In [None]:
spark.udf.register("div", div)
spark.sql("SELECT trip_distance, trip_duration, div(trip_distance, trip_duration) AS speed FROM taxi").show()

In [None]:
taxi.selectExpr("div(trip_distance, trip_duration)").show()