# Preprocessing

## Loading our data from S3

In [None]:
import boto3
from pyspark.sql import functions as F

In [None]:
S3_RESOURCE = 's3'
SCHEME = 's3a'
# TODO: define your BUCKET_NAME and PREFIX
BUCKET_NAME = ''
PREFIX = ''
### BEGIN STRIP ###
BUCKET_NAME = 'nibble-datasets'
PREFIX = ''
### END STRIP ###
INPUT_FILENAME = 'youtube-playlog.csv'

In [None]:
def get_s3_path(key, bucket_name=BUCKET_NAME, scheme=SCHEME):
    return f"{scheme}://{bucket_name}/{key}"

In [None]:
# We create a S3 resource and a Bucket from this same resource
s3 = boto3.resource(S3_RESOURCE)
bucket = s3.Bucket(BUCKET_NAME)

In [None]:
# TODO: load the the into a PySpark DataFrame: `playlog`
### BEGIN STRIP ###
playlog = (spark.read \
           .format('csv') \
           .options(header=True, inferSchema=True) \
           .load(get_s3_path(f'{PREFIX}/{INPUT_FILENAME}')))
### END STRIP ###
playlog.show(5)

## First analysis

In [None]:
# TODO: print out our DataFrame's schema
### BEGIN STRIP ###
playlog.printSchema()
### END STRIP ###

In [None]:
# TODO: use `.describe(...)` on your DataFrame
### BEGIN STRIP ###
playlog.describe().toPandas()
### END STRIP ###

Unnamed: 0,summary,timestamp,user,song
0,count,25739537.0,25739537.0,25739537
1,mean,1442700656.1045842,12697.352275450798,2.532571778181818E8
2,stddev,34432848.72366753,13094.065905828438,8.334645614940468E8
3,min,-139955897.0,0.0,---AtpxbkaE
4,max,1554321113.0,45903.0,zzzcFgRMY6c


### Missing values check

In [None]:
# TODO: count the missing values for each column
#       put the result in a pandas DataFrame and print it out
### BEGIN STRIP ###
import pandas as pd

pd.DataFrame.from_dict(
  {c: playlog.filter(playlog[c].isNull()).count() for c in playlog.columns},
  orient='index', columns=['missing values']
).T
### END STRIP ###

Unnamed: 0,timestamp,user,song
missing values,0,0,0


### Duplicates check

In [None]:
# TODO: check if playlog without duplicates has the same number of rows as the original
### BEGIN STRIP ###
playlog.count() == playlog.dropDuplicates().count()
### END STRIP ###

Seems like we have duplicates, let's count how many.

In [None]:
# TODO: figure out a way to count the number of duplicates
### BEGIN STRIP ###
(playlog.count() - playlog.dropDuplicates().count())
### END STRIP ###

In [None]:
# BONUS: compute descriptive statistics for duplicates 
#        How many duplicates on average (for duplicated rows)?
#        And the standard deviation?
#        What' the min? The max? 
#
# WARNING: this might be difficult
### BEGIN STRIP ###
playlog.groupBy(playlog.columns) \
  .agg(F.collect_list('timestamp').alias('ids')) \
  .select(F.size('ids').alias('ids_size')) \
  .where(F.col('ids_size') > 1) \
  .describe().toPandas()
### END STRIP ###

Unnamed: 0,summary,ids_size
0,count,80352.0
1,mean,2.538866487455197
2,stddev,1.6539289063819824
3,min,2.0
4,max,66.0


### Other checks

In [None]:
# TODO: order the dataframe by ascending `timestamp` and show the first 5 rows
### BEGIN STRIP ###
playlog.orderBy('timestamp').show(5)
### END STRIP ###

Do you see anything suspicious?

---

The first timestamp is negative, and it seems like it's the only one.  
We will make sure there aren't other like this.

In [None]:
# TODO: count the number of rows with a negative timestamp
### BEGIN STRIP ###
playlog.where(playlog['timestamp'] < 0).count()
### END STRIP ###

As expected, only one such negative timestamp. Since we have only one we can actually `.collect(...)` it.

In [None]:
# TODO: collect the problematic rows
### BEGIN STRIP ###
playlog.where(playlog['timestamp'] < 0).collect()
### END STRIP ###

There's only one problematic value among more than 25M.  This negative timestamp is an error, as such the real value is missing. We could try to reconstruct the real value but that would be a really tedious task, since it's one value over 25M, we will simply remove it.

## Removing the row with a negative timestamp

We will use our new knowledge about the data to perform some preprocessing.  

Our pipeline will have 2 steps:
1. Remove duplicates (123651 rows)
2. Remove row with negative timestamps (1 row)

We will call our new DataFrame `playlog_processed` and save it to S3 in parquet format.

In [None]:
# TODO: filter out:
#       - duplicated values
#       - rows with negative timestamp
#       and save the result to a new DataFrame: `playlog_processed`
#       Finally, print out the number of rows in this DataFrame
### BEGIN STRIP ###
playlog_processed = (playlog.dropDuplicates() \
                     .filter(~(playlog.timestamp < 0)))
playlog_processed.count()
### END STRIP ###

In [None]:
# TODO:
OUTPUT_BUCKET_NAME = ''
OUTPUT_PREFIX = ''

# BEGIN STRIP ###
OUTPUT_BUCKET_NAME = 'nibble-clients'
OUTPUT_PREFIX = 'jedha'
### END STRIP ###
OUTPUT_PROJECT_FOLDER = 'PySpark-Cloud/data'

output_path = f'{OUTPUT_PREFIX}/{OUTPUT_PROJECT_FOLDER}/processed/playlog_processed.parquet'

# TODO: save the processed DataFrame to S3 using the parquet format
### BEGIN STRIP ###
playlog_processed.write \
  .parquet(get_s3_path(output_path, bucket_name=OUTPUT_BUCKET_NAME), mode='overwrite')
### END STRIP ###