# Basic Operations in PySpark
Notebook to demonstrate the basic operations in Pyspark. Most importantly, demonstrates parallelization across multiple nodes.

In [1]:
# Import Pyspark libraries:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark import SparkFiles

In [2]:
# Define spark session:
spark = SparkSession.builder.getOrCreate()

### Basic RDD Creation

In [3]:
from datetime import date
import time

In [6]:
# Creating a basic dataframe with column names:
data = [('James','','Smith',date(1991,4,1),'M',3000),
        ('Michael','Rose','',date(2000,5,19),'M',4000),
        ('Robert','','Williams',date(1978,9,5),'M',4000),
        ('Maria','Anne','Jones',date(1967,12,1),'F',4000),
        ('Jen','Mary','Brown',date(1980,2,17),'F',-1)
]

columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
df = spark.createDataFrame(data=data, schema=columns)

df.head(5)

[Row(firstname='James', middlename='', lastname='Smith', dob=datetime.date(1991, 4, 1), gender='M', salary=3000),
 Row(firstname='Michael', middlename='Rose', lastname='', dob=datetime.date(2000, 5, 19), gender='M', salary=4000),
 Row(firstname='Robert', middlename='', lastname='Williams', dob=datetime.date(1978, 9, 5), gender='M', salary=4000),
 Row(firstname='Maria', middlename='Anne', lastname='Jones', dob=datetime.date(1967, 12, 1), gender='F', salary=4000),
 Row(firstname='Jen', middlename='Mary', lastname='Brown', dob=datetime.date(1980, 2, 17), gender='F', salary=-1)]

In [8]:
# Creating a basic dataframe with defined schema:
data_schema = StructType([
    StructField('firstname', StringType(), False),
    StructField('middlename', StringType(), False),
    StructField('lastname', StringType(), False),
    StructField('dob', DateType(), False),
    StructField('gender', StringType(), False),
    StructField('salary', IntegerType(), False)
])

df = spark.createDataFrame(data=data, schema=data_schema)
df.head(5)

[Row(firstname='James', middlename='', lastname='Smith', dob=datetime.date(1991, 4, 1), gender='M', salary=3000),
 Row(firstname='Michael', middlename='Rose', lastname='', dob=datetime.date(2000, 5, 19), gender='M', salary=4000),
 Row(firstname='Robert', middlename='', lastname='Williams', dob=datetime.date(1978, 9, 5), gender='M', salary=4000),
 Row(firstname='Maria', middlename='Anne', lastname='Jones', dob=datetime.date(1967, 12, 1), gender='F', salary=4000),
 Row(firstname='Jen', middlename='Mary', lastname='Brown', dob=datetime.date(1980, 2, 17), gender='F', salary=-1)]

In [4]:
# Reading in larger Dataframe (from Databricks default location):
sqlContext = SQLContext(spark.sparkContext)
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamonds = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema", "true").load(dataPath)



In [5]:
# Use df.show() to get a tabular view:
diamonds.show()

+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|_c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+---+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|  1| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
|  2| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
|  3| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
|  4| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
|  5| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
|  6| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
|  7| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
|  8| 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
|  9| 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
| 10| 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
| 11|  0.3|     Good|    J|    SI1| 64.0| 55.0|  339|4.25|4.28|2.73|
| 12| 0.23|    Ideal|    J|    VS1

### Basic EDA

In [36]:
# Number of rows:
diamonds.count()

53940

In [37]:
# Number of distinct elements:
diamonds.select('color').distinct().count()

7

In [42]:
# Groupby command:
diamonds.groupBy('color').agg(countDistinct('clarity').alias('clarity_count'), mean('price').alias('avg_price')).orderBy(desc('color')).show()

+-----+-------------+------------------+
|color|clarity_count|         avg_price|
+-----+-------------+------------------+
|    J|            8|  5323.81801994302|
|    I|            8| 5091.874953891553|
|    H|            8| 4486.669195568401|
|    G|            8| 3999.135671271697|
|    F|            8| 3724.886396981765|
|    E|            8|3076.7524752475247|
|    D|            8|3169.9540959409596|
+-----+-------------+------------------+



In [48]:
# Using a Window function:
from pyspark.sql.window import Window
wnd = Window.partitionBy("color").orderBy(desc("price"))

diamonds.withColumn("dense_rank", dense_rank().over(wnd)).select("*").filter("dense_rank <= 5").show()

+-----+-----+---------+-----+-------+-----+-----+-----+----+----+----+----------+
|  _c0|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|dense_rank|
+-----+-----+---------+-----+-------+-----+-----+-----+----+----+----+----------+
|27677| 2.19|    Ideal|    D|    SI2| 61.8| 57.0|18693|8.23|8.49|5.17|         1|
|27668| 2.01|    Ideal|    D|    SI2| 62.1| 56.0|18674|8.02|8.11|5.01|         2|
|27648| 2.11|  Premium|    D|    SI2| 60.9| 60.0|18575|8.28|8.21|5.02|         3|
|27636| 1.04|Very Good|    D|     IF| 61.3| 56.0|18542|6.53|6.55|4.01|         4|
|27628| 2.14|Very Good|    D|    SI2| 60.3| 60.0|18526|8.31|8.43|5.05|         5|
|27721| 2.02|Very Good|    E|    SI1| 59.8| 59.0|18731|8.11| 8.2|4.88|         1|
|27689| 1.51|    Ideal|    E|    VS1| 61.5| 57.0|18729|7.34| 7.4|4.53|         2|
|27684|  2.0|Very Good|    E|    SI1| 60.5| 59.0|18709|8.09|8.14|4.94|         3|
|27678| 1.28|    Ideal|    E|     IF| 60.7| 57.0|18700|7.09|6.99|4.27|         4|
|27638| 1.72|Ver

In [52]:
# Reformating dates:
df.select(col("dob"), date_format(col("dob"), "MM/dd/yyyy").alias("date_format")).show()

+----------+-----------+
|       dob|date_format|
+----------+-----------+
|1991-04-01| 04/01/1991|
|2000-05-19| 05/19/2000|
|1978-09-05| 09/05/1978|
|1967-12-01| 12/01/1967|
|1980-02-17| 02/17/1980|
+----------+-----------+



In [55]:
# Date diff:
df.select(col("dob"), current_date().alias("today"), datediff(current_date(), col("dob")).alias("datediff")).show()

+----------+----------+--------+
|       dob|     today|datediff|
+----------+----------+--------+
|1991-04-01|2023-04-21|   11708|
|2000-05-19|2023-04-21|    8372|
|1978-09-05|2023-04-21|   16299|
|1967-12-01|2023-04-21|   20230|
|1980-02-17|2023-04-21|   15769|
+----------+----------+--------+



### Parallelization using .applyInPandas()
It is possible to use `applyInPandas()` to perform some parallelization tasks that involve data processing. However this is usually not as straightforward as doing a `map()` or `mapPartitions()` operation over a RDD, as is shown in the next example.

In [6]:
import numpy as np
import pandas as pd

In [7]:
# First, repartition along the dimention to apply:
diamonds = diamonds.repartition(8, 'clarity')
print('DF partitions:', diamonds.rdd.getNumPartitions())

DF partitions: 8


In [23]:
# Example of apply function:
def mean_diamond_dims(single_part_df):
    ''' Function takes a partition of the input DF (partitioned by groupby()) as input, returns a Pandas DF as output. '''
    # Groupby value:
    clarity = single_part_df['clarity'].values[0]
    # Column of means:
    mean_xyz = (single_part_df['x'] + single_part_df['y'] + single_part_df['z'])/3
    # Mean of means:
    mean_all = np.mean(mean_xyz)
    # Output:
    return pd.DataFrame([{'clarity':clarity, 'mean_diamond_dimensions':mean_all}])

# Be sure to define the schema of the pandas output correctly:
out_schema = StructType([
    StructField('clarity', StringType()),
    StructField('mean_diamond_dimensions', FloatType())
])

diamonds.groupby('clarity').applyInPandas(mean_diamond_dims, schema = out_schema).show()

+-------+-----------------------+
|clarity|mean_diamond_dimensions|
+-------+-----------------------+
|   VVS2|               4.557346|
|     I1|              5.8927937|
|     IF|               4.339963|
|    VS1|              4.8650045|
|   VVS1|              4.3322444|
|    VS2|              4.9360156|
|    SI1|               5.138828|
|    SI2|               5.582558|
+-------+-----------------------+



In [8]:
# Parallelizing some process:
def some_complex_func(partial_df):
    # Simulating some long-running function or process:
    time.sleep(10)
    return pd.DataFrame([{'Output_Val':np.random.uniform()}])

# Output schema:
out_schema = StructType([
    StructField('Output_Val', FloatType())
])

t0 = time.time()
diamonds.groupby('clarity').applyInPandas(some_complex_func, schema = out_schema).show()
print('Total runtime:', time.time() - t0)

+----------+
|Output_Val|
+----------+
|0.35957536|
|0.35957536|
|0.50110745|
|0.72847253|
|0.50110745|
|0.52407885|
| 0.7612426|
|0.66044647|
+----------+

Total runtime: 52.50363731384277


### Parallelization using map() and mapPartitions()
The `map()` and `mapPartitions()` operations are the basic workhorses of parallelization in Spark. Very often, this is the better choice when compared with `applyInPandas()`.

In [None]:
# Define spark session and context:
spark = SparkSession.builder.appName('loan_vectors').getOrCreate()
sc = spark.sparkContext

In [1]:
# Check the number of executors available in our spark session:
print(sc._jsc.sc().getExecutorMemoryStatus().size())

8


In [3]:
# Example of a basic RDD map(), which applies a function over each element of the RDD:
rdd = sc.parallelize([1, 2, 3, 4])
rdd_calc = rdd.map(lambda x: x**2)
print(rdd_calc.collect())

[1, 4, 9, 16]


In [5]:
# A more advanced example using map(), this time the RDD is a partition of input lists:

# Calculation functions for parallelizing:
def add_func(x, y):
    time.sleep(2)
    return x+y

def spark_run(input_list):
    out_list = [add_func(i[0], i[1]) for i in input_list]
    return out_list

# Define input list
list1 = [(1, 1), (2, 1), (3, 2), (4, 10)]

# Start of processing:
t0 = time.time()
workers = 8
partitions = [[(p[0]+i, p[1]+i) for p in list1] for i in range(workers)]
rdd = sc.parallelize(partitions)
rdd_out = rdd.map(lambda x: spark_run(x))
print(rdd_out.collect())
print('Runtime:', time.time() - t0)

[[2, 3, 5, 14], [4, 5, 7, 16], [6, 7, 9, 18], [8, 9, 11, 20],
 [10, 11, 13, 22], [12, 13, 15, 24], [14, 15, 17, 26], [16, 17, 19, 28]]
Runtime: 9.748255252838135


In [6]:
# Unlike map(), mapPartitions() applies a function to each partition of a RDD rather than each element:
def sum_partition(iterator):
    yield sum(iterator)

rdd = sc.parallelize([1, 2, 3, 4], 2)
sum_rdd = rdd.mapPartitions(sum_partition)
print(sum_rdd.collect())

[3, 7]
