# Practical guide for translating Snowball model from Pandas to PySpark

The aim of this guide is to provide a worked example for translating Pandas operations on DataFrames to PySpark, and highlighting any difficulties or benefits in doing so.

## Setting up SparkSession in PySpark

To begin, we will go through the steps of setting up a so-called _SparkSession_, which provides us with an entry point for working with Spark DataFrame and connecting to (Apache) Spark, the underlying analytics engine of PySpark. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance, allowing users to  execute large-scale data processing.

More infomation about connecting to Spark using SparkSession can be found here https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html.

Databricks Notebook automatically sets up a SparkSession, which can be accessed using the variable named 'spark'.

To run PySpark code locally you must set up a SparkSession manually, which can be done by simply running the following three lines of code.

In [9]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').appName('SparkByExamples.com').getOrCreate()

## Importing .csv file as a Spark DataFrame

There are many ways to build a Spark DataFrame, but for our purposes we do so by importing from a .csv file. In order to make comparisions between Pandas and PySpark operations we also import the same dataset as a Pandas DataFrame.

In [36]:
import pandas as pd

# record path for example revenue data
path = '..\Snowball model Pandas\Alteryx datasets\Snowball example inputs v2.csv'

# import revenue data as a Pandas DataFrame
revenue_data_pd = pd.read_csv(path)

# import .csv file as PySpark DataFrame
revenue_data_ps = spark.read.csv(path, header=True)

## Translating operations in clean_date_columns        

Below we define the function clean_date_columns, which was used in our Python Snowball model.

In [37]:
import pandas as pd

def clean_date_columns_pd(dataset, date_columns=['Month']):
    """
            Returns revenue data with timestamps in revenue_data[date_column] set as datetime objects and normalised to
            have day=1.
                Args:
                    revenue_data (pandas.DataFrame): list of payments.
                    date_columns (str): titles for columns with dates, entries of which are currently datetime objects
                                        or standard format datetime strings.
                Returns:
                    revenue_data_clean (pandas.DataFrame): list of payments.
            """
    for date_column in date_columns:
        # cast entries in date_column from string to datetime in case they are not already
        dataset.loc[:, date_column] = pd.to_datetime(dataset[date_column])

        # normalize dates of all payments to be the first day of the given month
        dataset.loc[:, date_column] = dataset[date_column].dt.to_period('M').dt.to_timestamp()

    dataset_clean = dataset

    return dataset_clean

Given a dataset with one or more columns of dates, the two operations carried out by the clean_date_columns function written using Pandas are as follows
- Cast datatype of entries to date columns as Python datetime objects.
- Normalize all dates appearing in these date columns to the first day of each given month.

These operations can be replicated in PySpark using Date manipulation tools from the pyspark.sql.functions module applied to PySpark SQL DateType objects. In the cell below we define a PySpark version of clean_date_columns.

In [38]:
from pyspark.sql.functions import col, trunc, to_date
from pyspark.sql.types import DateType

def clean_date_columns_ps(dataset, date_columns=['Month']):
    """
        Returns revenue data with timestamps in revenue_data[date_column] set as datetime objects and normalised to
        have the first day of each month.
            Args:
                dataset (Spark DataFrame): dataset with some date columns.
                date_columns (str): titles for columns with payment dates currently datetime objects or standard
                                    format datetime strings.
            Returns:
                revenue_data_clean (Spark DataFrame): dateset with date columns set to datetime objects.
    """

    # cast datatype of entries to date columns from String to PySpark DateType objects
    dataset_dt = dataset.select(*(to_date(col(c)).alias(c) for c in date_columns))

    # normalize dates of all payments to be the first day of the given month
    dataset_clean = dataset_dt.select(*(trunc(col(c), "Month").alias(c) for c in date_columns))

    return dataset_clean

In the cells below we apply both clean_date_columns functions to the respective Pandas and PySpark DataFrames, and check that the desired changes have been made to the date column.

In [40]:
# check datatype and content of date column in Pandas DataFrame before applying clean_date_columns_pd 
print("Initially, the date column in our Pandas DataFrame has dtype given as follows")
print(revenue_data_pd[date_columns].dtypes)
print("Here is a small sample of entries:")
print(revenue_data_pd[date_columns].head(5))

# check datatype and content of date column in PySpark DataFrame before applying clean_date_columns_ps
print("Initially, the date column in our PySpark DataFrame has datatype given as follows")
revenue_data_ps.select(*(col(c) for c in date_columns)).printSchema()
print("Here is a small sample of entries:")
print(revenue_data_ps[date_columns].show(5))

Initially, the date column in our Pandas DataFrame has dtype given as follows
Month    object
dtype: object
Here is a small sample of entries:
        Month
0  2019-04-01
1  2019-12-01
2  2019-04-01
3  2019-12-01
4  2019-12-01
Initially, the date column in our Pandas DataFrame has datatype given as follows
root
 |-- Month: string (nullable = true)

Here is a small sample of entries:
+----------+
|     Month|
+----------+
|2019-04-01|
|2019-12-01|
|2019-04-01|
|2019-12-01|
|2019-12-01|
+----------+
only showing top 5 rows

None


In [41]:
# define list of date column titles
date_columns = ['Month']

# clean date column of Pandas Dataframe
revenue_data_pd_clean = clean_date_columns_pd(dataset=revenue_data_pd, date_columns=date_columns)

# clean date column of Spark Dataframe
revenue_data_ps_clean = clean_date_columns_ps(dataset=revenue_data_ps, date_columns=date_columns)

In [42]:
# check datatype and content of date column in Pandas DataFrame before applying clean_date_columns_pd 
print("After applying clean_data_columns_pd, the date column in our Pandas DataFrame has dtype given as follows")
print(revenue_data_pd_clean[date_columns].dtypes)
print("Here is a small sample of entries:")
print(revenue_data_pd_clean[date_columns].head(5))

# check datatype and content of date column in PySpark DataFrame before applying clean_date_columns_ps
print("After applying clean_data_columns_ps, the date column in our PySpark DataFrame has datatype given as follows")
revenue_data_ps_clean.select(*(col(c) for c in date_columns)).printSchema()
print("Here is a small sample of entries:")
print(revenue_data_ps_clean[date_columns].show(5))

Initially, the date column in our Pandas DataFrame has dtype given as follows
Month    datetime64[ns]
dtype: object
Here is a small sample of entries:
       Month
0 2019-04-01
1 2019-12-01
2 2019-04-01
3 2019-12-01
4 2019-12-01
Initially, the date column in our Pandas DataFrame has datatype given as follows
root
 |-- Month: date (nullable = true)

Here is a small sample of entries:
+----------+
|     Month|
+----------+
|2019-04-01|
|2019-12-01|
|2019-04-01|
|2019-12-01|
|2019-12-01|
+----------+
only showing top 5 rows

None


## Translating operations in add_start_end_dates