# Programming Exercise using PySpark

## Background:
A very small company called **KommatiPara** that deals with bitcoin trading has two separate datasets dealing with clients that they want to collate to starting interfacing more with their clients. One dataset contains information about the clients and the other one contains information about their financial details.

The company now needs a dataset containing the emails of the clients from the United Kingdom and the Netherlands and some of their financial details to starting reaching out to them for a new marketing push.

Since all the data in the datasets is fake and this is just an exercise, one can forego the issue of having the data stored along with the code in a code repository.


## Things to be aware:

- Use Python **3.8**
- Avoid using notebooks, like **Jupyter** for instance. While these are good for interactive work and/or prototyping in this case they shouldn't be used.
- There's no need to use classes, because the assignment is quite small and not very complex in what it does classes are unnecessary.
- Only use clients from the **United Kingdom** or the **Netherlands**.
- Remove personal identifiable information from the first dataset, **excluding emails**.
- Remove credit card number from the second dataset.
- Data should be joined using the **id** field.
- Rename the columns for the easier readability to the business users:

|Old name|New name|
|--|--|
|id|client_identifier|
|btc_a|bitcoin_address|
|cc_t|credit_card_type|

- The project should be stored in GitHub and you should only commit relevant files to the repo.
- Save the output in a **client_data** directory in the root directory of the project.
- Add a **README** file explaining on a high level what the application does.
- Application should receive three arguments, the paths to each of the dataset files and also the countries to filter as the client wants to reuse the code for other countries.
- Use **logging**.
- Create generic functions for filtering data and renaming.
Recommendation: Use the following package for Spark tests - https://github.com/MrPowers/chispa
- If possible, have different branches for different tasks that once completed are merged to the main branch. Follow the GitHub flow - https://guides.github.com/introduction/flow/.
- **Bonus** - If possible it should have an automated build pipeline using GitHub Actions - https://docs.github.com/en/actions - or Travis - https://www.travis-ci.com/ for instance.
- **Bonus** - If possible log to a file with a rotating policy.
- **Bonus** - Code should be able to be packaged into a source distribution file.
- **Bonus** - Requirements file should exist.
- **Bonus** - Document the code with docstrings as much as possible using the reStructuredText (reST) format.


In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
import shutil

# Initialize a SparkSession
spark = SparkSession.builder.appName("JoinDatasets").getOrCreate()

df1 = spark.read.option('header', True).csv('dataset_one.csv')
df2 = spark.read.option('header', True).csv('dataset_two.csv')

# Perform join based on the "id" column
df_joined = df1.join(df2, on="id", how="inner")
# drop ccn_n column 
df_joined=df_joined.drop("cc_n")\
                   .drop("first_name")\
                   .drop("last_name")
# Filter with UK and NL
filtered_df=df_joined.filter((col("country") == "United Kingdom") | (col("country") == "Netherlands"))

filtered_df = filtered_df.withColumnRenamed("id","client_identifier")\
                         .withColumnRenamed("btc_a","bitcoin_address")\
                         .withColumnRenamed("cc_t", "credit_card_type")

# Show the result 
# filtered_df.show()

output_folder = "client_data"

# Create the client_data folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# output_path = "client_data/dataset_three.csv"
# filtered_df.write.mode("overwrite").csv(output_path, header=True)

# output_path2 = "client_data/dataset_three11.csv"
# filtered_df.coalesce(1).write.option("header","true").mode("overwrite").format("csv").save(output_path2)

location = "client_data"
filename = "dataset_three22.csv"

def WriteCsvToLocation(dataframe, location, filename):

    filePathDestTemp = location + "/tmp"

    dataframe.coalesce(1).write.option("header","true").mode("overwrite").format("csv").save(filePathDestTemp)

    
    name = ''
    for fileName in os.listdir(location):
        if fileName.endswith('.csv'):
            name = fileName
            break
    
    name.show()

    # shutil.copy(os.path.join(location + "/tmp/", name), os.path.join(location, filename))            
    
    for fileNameOs in os.listdir(location):
        if fileNameOs != filename:
            break

    # dbutils.fs.cp(location + "tmp/" + name, location + filename + ".csv")
    # dbutils.fs.rm(location + "tmp", recurse = True)


WriteCsvToLocation(filtered_df,location,filename)


#Create generic functions for filtering data and renaming.

AttributeError: 'str' object has no attribute 'show'

In [13]:
# Example for reading paruqet files
# TODO: play around and read another parquet

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Read paruqet files").getOrCreate()

df = spark.read.csv('dataset_one.csv')

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, when, count

# Spark is a little special
# it needs the session to be up before we can start with data processing
spark = SparkSession.builder \
    .appName("Don't worry about the session for now") \
    .getOrCreate()

result = spark.read \
    .option('header', True) \
    .csv('dataset_one.csv') \
    .filter(col('id') > 0) \
    .withColumn('country_code', 
        when(col('country').startswith('United Kingdom'), lit('UK')) \
            .otherwise(
                when(col('country').startswith('Netherlands'), lit('NL')) \
                    .otherwise(lit('other country'))
    )).alias('country_code') \
    .groupBy('country_code') \
    .agg(count('*').alias('cnt'))

result.show()

24/09/15 12:39:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------------+---+
| country_code|cnt|
+-------------+---+
|           NL| 62|
|           UK| 38|
|other country|900|
+-------------+---+



In [1]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder.appName("JoinDatasets").getOrCreate()

# Define the schema for both datasets
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Schema for the first dataset
schema1 = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("country", StringType(), True)
])

# Schema for the second dataset
schema2 = StructType([
    StructField("id", IntegerType(), True),
    StructField("btc_a", StringType(), True),
    StructField("cc_t", StringType(), True),
    StructField("cc_n", StringType(), True)
])

# Sample data for the first dataset (first set of data you provided)
data1 = [
    (1, "Feliza", "Eusden", "feusden0@ameblo.jp", "France"),
    (2, "Priscilla", "Le Pine", "plepine1@biglobe.ne.jp", "France"),
    (3, "Jaimie", "Sandes", "jsandes2@reuters.com", "France"),
    (4, "Nari", "Dolphin", "ndolphin3@cbslocal.com", "France"),
    (5, "Garik", "Farre", "gfarre4@economist.com", "France"),
    (6, "Kordula", "Broodes", "kbroodes5@amazon.de", "France"),
    (7, "Rakel", "Ingliby", "ringliby6@ft.com", "United States")
]

# Sample data for the second dataset (second set of data you provided)
data2 = [
    (1, "1wjtPamAZeGhRnZfhBAHHHjNvnHefd2V2", "visa-electron", "4175006996999270"),
    (2, "1Js9BA1rV31hJFmN25rh8HWfrrYLXAyw9T", "jcb", "3587679584356527"),
    (3, "1CoG9ciLQVQCnia5oXfXPSag4DQ4iYeSpd", "diners-club-enroute", "201876885481838"),
    (4, "1GNvinVKGzPBVNZScNA2jKnDSBs4R7Y3rY", "switch", "564182038040530730"),
    (5, "1DHTzZ7ypu3EzWtLBFiWoRo9svd1STMyrg", "jcb", "3555559025151828"),
    (6, "1LWktvit3XBCJNrsji7rWj2qEa5XAmyJiC", "jcb", "3580083825272493"),
    (7, "1J71SRGqUjhqPuHaZaG8wEtKdNRaKUiuzm", "switch", "491193585665108260")
]

# Create DataFrames for both datasets
df1 = spark.createDataFrame(data1, schema=schema1)
df2 = spark.createDataFrame(data2, schema=schema2)

df1 = spark.read.option('header', True).csv('dataset_one.csv')
df2 = spark.read.option('header', True).csv('dataset_two.csv')

# Perform the join operation based on the "id" column
df_joined = df1.join(df2, on="id", how="inner")

# Show the result of the join
df_joined.show()

your 131072x1 screen size is bogus. expect trouble
24/09/15 12:53:43 WARN Utils: Your hostname, LCE64920 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/09/15 12:53:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/15 12:53:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/15 12:53:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


+---+----------+----------+--------------------+--------------+--------------------+--------------------+------------------+
| id|first_name| last_name|               email|       country|               btc_a|                cc_t|              cc_n|
+---+----------+----------+--------------------+--------------+--------------------+--------------------+------------------+
|  1|    Feliza|    Eusden|  feusden0@ameblo.jp|        France|1wjtPamAZeGhRnZfh...|       visa-electron|  4175006996999270|
|  2| Priscilla|   Le Pine|plepine1@biglobe....|        France|1Js9BA1rV31hJFmN2...|                 jcb|  3587679584356527|
|  3|    Jaimie|    Sandes|jsandes2@reuters.com|        France|1CoG9ciLQVQCnia5o...| diners-club-enroute|   201876885481838|
|  4|      Nari|   Dolphin|ndolphin3@cbsloca...|        France|1GNvinVKGzPBVNZSc...|              switch|564182038040530730|
|  5|     Garik|     Farre|gfarre4@economist...|        France|1DHTzZ7ypu3EzWtLB...|                 jcb|  3555559025151828|


### Reading CSV Files
- `spark.read.csv("path/to/file.csv", inferSchema=True, header=True)`

Since CSV is just a text file we need to perform some additional actions if we want to know both column names and the data type for each column.

In the example above, `inferSchema` determines if Spark should automatically detect the data types of each column based on values, `header` specifies if the first row of data is a header.

You can also explicitly define the schema when reading data. Than can significantly increase the efficiency of data processing by avoiding the overhead of schema inference.

Here is how:
- Import necessary types from `pyspark.sql.types`.
- You can define the schema using PySpark's `StructType` and `StructField`.
- Define the schema and use it in the `read.csv()` method.

In [5]:
# Let's see what happens when we don't use 
# the header and inferSchema options

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("No Schema Example").getOrCreate()

# Read the CSV file using the defined schema
df = spark.read.csv("data/taxi_trips_shortened.csv")
df.printSchema()

24/09/14 16:24:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)



In [4]:
# Now it's time to use the header option

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Header Example").getOrCreate()

# Read the CSV file using the defined schema
df = spark.read.csv("data/taxi_trips_shortened.csv", header=True)
df.printSchema()

24/04/15 13:40:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)



In [5]:
# Now it's time to infer the schema
# We don't want all columns to be of string type!

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Infer Schema Example").getOrCreate()

# Read the CSV file using the defined schema
df = spark.read.csv("data/taxi_trips_shortened.csv", header=True, inferSchema=True)
df.printSchema()

24/04/15 13:40:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 4:>                                                          (0 + 1) / 1]

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: integer (nullable = true)
 |-- extra: integer (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: integer (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: string (nullable = true)



                                                                                

In [None]:
# Exercise!
# Is infered schema correct?
# Is it the same as the one received from the parquet file?

In [6]:
# Schema inferring can be costly and/or incorrect, 
# let's specify the schema by hand!

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, 
StructField, StringType, \
        IntegerType, TimestampType, BooleanType, DoubleType

spark = SparkSession.builder.appName("Schema by hand Example").getOrCreate()

# Define the schema
schema = StructType([
        StructField("VendorID", IntegerType(), True),
        StructField("lpep_pickup_datetime", TimestampType(), True),
        StructField("lpep_dropoff_datetime", TimestampType(), True),
        StructField("store_and_fwd_flag", BooleanType(), True),
        StructField("RatecodeID", IntegerType(), True),
        StructField("PULocationID", IntegerType(), True),
        StructField("DOLocationID", IntegerType(), True),
        StructField("passenger_count", IntegerType(), True),
        StructField("trip_distance", DoubleType(), True),
        StructField("fare_amount", IntegerType(), True),
        StructField("extra", IntegerType(), True),
        StructField("mta_tax", DoubleType(), True),
        StructField("tip_amount", IntegerType(), True),
        StructField("improvement_surcharge", DoubleType(), True),
        StructField("total_amount", DoubleType(), True),
        StructField("payment_type", IntegerType(), True),
        StructField("trip_type", StringType(), True),
])

df = spark.read.csv("data/taxi_trips_shortened.csv", header=True, schema=schema)
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: boolean (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: integer (nullable = true)
 |-- extra: integer (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: string (nullable = true)



24/04/15 13:41:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.



## Writing Data in PySpark

Writing data files involves specifying the format and path. We can also manage how the data is partitioned and the number of output files.

To write files to local or external storages we need to call an appropriate method from the `write` object stored in the spark session.

### Writing CSV Files
- `df.write.csv("path/to/output", mode="overwrite", partitionBy="column_name")`

### Writing Parquet Files
- `df.write.parquet("path/to/output", mode="overwrite", partitionBy="column_name")`

`partitionBy` allows you to specify a column to partition the data upon. Data for each partition will be stored in a separate subdirectory named `<partition_column_name>=<column_value>`. 
        
**In Spark, reading or writing a "file" often means reading or writing a directory structure!**

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Writing data example").getOrCreate()

# Let's read some data
df = spark.read.parquet("data/taxi_trips_shortened.parquet")

# Example of writing a DataFrame to CSV
df.write.parquet(
    "data/partitioned_data", mode="overwrite", partitionBy="VendorID")

                                                                                

Investigate the `data/partitioned_data` directory and see its structure for yourself! You should see two subdirectories. First only contains data where `VendorID==1` and second only for `VendorID==2`

Such split, called `partitioning`, is one of the best methods of improving read performance in big data sets!

## Data Compression in PySpark

Compression is an important technique for managing the storage and speed of data processing. PySpark supports various compression codecs for both CSV and Parquet formats.

### CSV Compression
- CSV files can be compressed with the following codecs: bzip2, gzip, lz4, snappy, and deflate. 
- Example: `df.write.csv("path/to/output.csv", compression="gzip")`

### Parquet Compression
- Parquet files inherently support efficient compression and encoding schemes. The default codec is snappy (you don't have specify it), but others like gzip and lzo are also supported.
- Example: `df.write.parquet("path/to/output.parquet", compression="snappy")`

Using the right compression technique can significantly reduce the disk space used and potentially speed up the read/write operations.


## Exercises

Now, practice your skills with these exercises.

1. Read a CSV file and infer its schema.
2. Read a Parquet file and show its first 10 rows.
3. Write a DataFrame to CSV format with partition by a specific column.
4. Write a DataFrame to Parquet format and control the number of output files.
5. Read multiple CSV files from a directory, combine them into one DataFrame, and write to a Parquet file.
6. Experiment with different data partition strategies when writing a large DataFrame to multiple output files. Big dataframe can be created from the `data/taxi_trips_full.parquet` file.
7. Compare sizes of csv and parquet files that store the same data. With and without compression! Experiment with different codecs! Do all codecs work without any additional dependencies?