# Redshift's ingestion

## Loading our data from S3

In [0]:
import boto3

In [0]:
S3_RESOURCE = 's3'
SCHEME = 's3a'
# TODO: set up your own bucket name and prefix
BUCKET_NAME = 'Project3'
PREFIX = 'P3'
### BEGIN STRIP ###
ACCESS_KEY_ID = "ACCESS_KEY_ID_JEDHA" # cle du compte student
SECRET_ACCESS_KEY = "SECRET_ACCESS_KEY_JEDHA" # secret key du compte student

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") 

items_tidy_path = "s3://full-stack-bigdata-datasets/Big_Data/YOUTUBE/items_tidy.parquet"

### END STRIP ###
INPUT_FILENAME = 'items_tidy.parquet'


# Note : the real ACCESS_KEY_ID and SECRET_ACCESS_KEY have been removed from the script for security reasons

In [0]:
# We define a helper function to easily get the full S3 path of an object, given its key
def get_s3_path(key, bucket_name=BUCKET_NAME, scheme=SCHEME):
  return f"{scheme}://{bucket_name}/{key}"

In [0]:
# We create a S3 resource and a Bucket from this same resource
s3 = boto3.resource(S3_RESOURCE)
bucket = s3.Bucket(BUCKET_NAME)

In [0]:
# TODO: load the parquet file into a PySpark DataFrame: `df`
# NOTE: as a reminder, parquet is the default file format for loading with PySpark
#
# TODO: as a sanity check, count the rows in the DataFrame
### BEGIN STRIP ###
df = spark.read.parquet(items_tidy_path, multiline=True)
df.count()
### END STRIP ###

In [0]:
# TODO: print out our DataFrame's schema
### BEGIN STRIP ###
df.printSchema()
### END STRIP ###

In [0]:
# TODO: display the first 5 rows of df
### BEGIN STRIP ###
df.limit(5).toPandas()
### END STRIP ###

Unnamed: 0,items.contentDetails.caption,items.contentDetails.contentRating.ytRating,items.contentDetails.definition,items.contentDetails.dimension,items.contentDetails.duration,items.contentDetails.licensedContent,items.contentDetails.projection,items.etag,items.id,items.kind,...,items.statistics.dislikeCount,items.statistics.favoriteCount,items.statistics.likeCount,items.statistics.viewCount,items.status.embeddable,items.status.license,items.status.madeForKids,items.status.privacyStatus,items.status.publicStatsViewable,items.status.uploadStatus
0,False,,sd,2d,PT3M33S,True,rectangular,SqP7uUVSol30dxvuScN6JUny6T4,t1l8Z6gLPzo,youtube#video,...,26,0,1028,223172,True,youtube,False,public,True,processed
1,False,,hd,2d,PT7M46S,False,rectangular,m3DnhzTEw9ABiqzBvdasfk5Av_8,we5gzZq5Avg,youtube#video,...,3,0,124,13409,True,youtube,False,public,True,processed
2,False,,sd,2d,PT3M7S,False,rectangular,zyzs7STAR3NG-_pZe-0nGkbKoqg,49esza4eiK4,youtube#video,...,780,0,25540,10106655,True,youtube,False,public,True,processed
3,False,,hd,2d,PT3M43S,False,rectangular,hX2C15F6fdO5A-stUFMU5Az2PvI,BoO6LfR7ca0,youtube#video,...,0,0,255,29153,True,youtube,False,public,True,processed
4,False,,hd,2d,PT5M,False,rectangular,rYHoV38PLpMbRuX_zhGTVBKNotw,DaH4W1rY9us,youtube#video,...,1784,0,136033,16488714,True,youtube,False,public,True,processed


In [0]:
# TODO: check your dataframe using `.describe(...)`
### BEGIN STRIP ###
df.describe().toPandas()
### END STRIP ###

Unnamed: 0,summary,items.contentDetails.caption,items.contentDetails.contentRating.ytRating,items.contentDetails.definition,items.contentDetails.dimension,items.contentDetails.duration,items.contentDetails.projection,items.etag,items.id,items.kind,...,items.snippet.thumbnails.standard.width,items.snippet.title,items.statistics.commentCount,items.statistics.dislikeCount,items.statistics.favoriteCount,items.statistics.likeCount,items.statistics.viewCount,items.status.license,items.status.privacyStatus,items.status.uploadStatus
0,count,20,0.0,20,20,20,20,20,20,20,...,11.0,20,19.0,20.0,20.0,20.0,20.0,20,20,20
1,mean,,,,2.0,,,,,,...,640.0,,949.421052631579,407.9,0.0,20848.85,3333855.45,,,
2,stddev,,,,0.0,,,,,,...,0.0,,1476.7415449709792,642.8754401740258,0.0,36424.00950251734,5279061.979187684,,,
3,min,false,,hd,2d,PT10M16S,rectangular,0QV0s3_FDhsOv0LX3TwZuRfXBxM,3go6xFb0FrU,youtube#video,...,640.0,Always Coca Cola A cappella,1.0,0.0,0.0,1028.0,10106655.0,youtube,public,processed
4,max,true,,sd,2d,PT7M59S,rectangular,zyzs7STAR3NG-_pZe-0nGkbKoqg,we5gzZq5Avg,youtube#video,...,640.0,[Electronic] - Pegboard Nerds - Bassline Kicki...,91.0,96.0,0.0,83129.0,92552.0,youtube,public,processed


Let's try something that will fail... accessing the columns in place:

In [0]:
# TODO: Try to select the first column
# NOTE: ⚠️ This is supposed to fail
### BEGIN STRIP ###
df.select("items.contentDetails.caption").show()
### END STRIP ###

Did you understand what happened here? Because we have dots `.` in our column name, PySpark thinks we're trying to access nested field structure when calling `select`!

In the Scala programming language (in which Spark is built), we can use the backtick to identify a **string literal**. String literal is what we need to tell Scala "Please, don't interpret the `.`: they are supposed to be taken exactly as they are."

Hence, the solution is to enclose every column names with backticks:

```
"hello.world" becomes "`hello.word`"
```

In [0]:
# TODO: show the selected content of the first column, with the trick we just gave you
### BEGIN STRIP ###
df.select("`items.contentDetails.caption`").show()
### END STRIP ###

Ok, it works, but it's not very convenient... Also, every columns starts with `items` which is not very informative.

For each column, we will:
- drop the first part, i.e. `items.`
- replace all `.` with `_`

Let's create a function that does just this, takes a column name as a string parameter and return a sanitized version of it.  For instance, `items.content.title` becomes `content_title`.

We will call this function `sanitize_column_name`.

In [0]:
# TODO: create the function: sanitize_column_name
### BEGIN STRIP ###
def sanitize_column_name(col_name):
  return col_name.replace("items.", "").replace(".", "_")
### END STRIP ###

In [0]:
# TODO: use this cell to test your new function
### BEGIN STRIP ###
sanitize_column_name("items.contentDetails.caption")
### END STRIP ###

In [0]:
sanitize_column_name("items.contentDetails.licensedContent")

Now comes an interesting problem: we need the entire dataframe to be processed so its column names become sanitized. There are several way to achieve this...

Using `functools.reduce` ([docs](https://docs.python.org/3/library/functools.html#functools.reduce)) is probably the most elegant way to achieve this, but it's not the easiest for beginners... Have a try! The most important is that you end up with the final result: choose your method.

In [0]:
# TODO: create a new DataFrame with all columns sanitized: `df_renamed`
### BEGIN STRIP ###


new_column_name_list = list(map(lambda x: sanitize_column_name(x), df.columns))
df_renamed = df.toDF(*new_column_name_list)

  
### END STRIP ###

# TODO: print out the columns of `df_renamed` to check if their names have been properly renamed
### BEGIN STRIP ###
df_renamed.columns
### END STRIP ###

Now we should be able to query this DataFrame without having to mess with annoying _backticks_ escaping.

Let's try it out...

In [0]:
# TODO: select the first column of `df` and show the first 5 rows
### BEGIN STRIP ###
df_renamed.select("contentDetails_caption").show(5)
### END STRIP ###

It works! :)

## Updating the schema

Let's have a look at the current schema, and more importantly, check out the types of the statistics columns...

In [0]:
# TODO: print out the schema of `df_renamed`
### BEGIN STRIP ###
df_renamed.printSchema()
### END STRIP ###

As you just saw, all the statistics are casted as `string` instead of numbers! This is bad for later analysis, we need to fix this now. We put you on the right track by importing the type `LongType` that you need to use to cast all your statistics columns into.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import LongType

# TODO: cast all the statistics columns to `LongType`: `df_casted`
### BEGIN STRIP ###

#df_casted = df.withColumn("statistics_commentCount", F.col("statistics_commentCount").cast(Long),
#                          "statistics_dislikeCount", F.col("statistics_dislikeCount").cast(Long),
#                          "statistics_favoriteCount", F.col("statistics_favoriteCount").cast(Long),
#                          "statistics_likeCount", F.col("statistics_likeCount").cast(Long),
#                          "statistics_viewCount", F.col("statistics_viewCount").cast(Long)
#                         )

#df_casted = df_renamed.selectExpr("cast(statistics_commentCount as Long) statistics_commentCount",
#                                  "cast(statistics_dislikeCount as Long) statistics_dislikeCount",
#                                  "cast(statistics_favoriteCount as Long) statistics_favoriteCount",
#                                  "cast(statistics_likeCount as Long) statistics_likeCount",
#                                  "cast(statistics_viewCount as Long) statistics_viewCount")


df_casted = df_renamed.withColumn("statistics_commentCount", df_renamed["statistics_commentCount"].cast("Long")) \
                      .withColumn("statistics_dislikeCount", df_renamed["statistics_dislikeCount"].cast("Long")) \
                      .withColumn("statistics_favoriteCount", df_renamed["statistics_favoriteCount"].cast("Long")) \
                      .withColumn("statistics_likeCount", df_renamed["statistics_likeCount"].cast("Long")) \
                      .withColumn("statistics_viewCount", df_renamed["statistics_viewCount"].cast("Long"))

### END STRIP ###

# TODO: print out the schema, and check if your casting worked
### BEGIN STRIP ###
df_casted.printSchema()
### END STRIP ###

In [0]:
# Get rid of the long description content before storing in warehouse
to_drop = ['snippet_description', 'snippet_localized_description', 'snippet_localized_title']
df_clean = df_casted.drop(*to_drop)

# Some titles are longer than the default 256-byte length...
df_clean = df_clean.withColumn('snippet_title', df_clean['snippet_title'].alias('snippet_title', metadata={'redshift_type': 'VARCHAR(512)'}))

df_clean.printSchema()

## Loading data onto Redshift

Now that your data is properly formatted, clean and tidy, you just finished the `transform` step of your ETL pipeline, congratulations!

What you need to do now is to **LOAD** this data into a place where it can be stored for later analysis. Today, you will load this data into a widely used Data Warehousing solution: [Amazon Redshift](https://aws.amazon.com/redshift/)

The [spark-redshift documentation](https://docs.databricks.com/data/data-sources/aws/amazon-redshift.html) can be helpful... Don't hesistate to have a look.

In [0]:
# TODO: set up your Redshift parameters
# NOTE: again, this is not the appropriate way to this... You should, at least, use environment variables for the password
REDSHIFT_USER = 'awsuser'
REDSHIFT_PASSWORD = 'REDSHIFT_PASSWORD'
REDSHIFT_PATH_FULL = 'jdbc:postgresql://redshift-cluster-1.cfeedmkxhwtq.eu-west-3.redshift.amazonaws.com:5439/dev'
REDSHIFT_PORT = 5439
REDSHIFT_DB = 'dev'
REDSHIFT_TABLE = 'youtube_data'

properties = {"user": REDSHIFT_USER, "password": REDSHIFT_PASSWORD, "driver": "org.postgresql.Driver"}
table = sqlContext.read.jdbc(url=REDSHIFT_PATH_FULL, table=REDSHIFT_TABLE, properties=properties)

# For security reasons, the real REDSHIFT_PASSWORD has been removed from the script after the data transfer to AWS Redshift

As written in this [tutorial](https://github.com/databricks/spark-redshift/tree/master/tutorial), there are several modes you can choose from when loading data from Spark to Redshift. 

The 4 `mode` to choose from are:
  - `overwrite`: drop the table if it exists, then load the data in a new one
  - `append`: create the table if it does not exists, else append the data to the existing table
  - `error` (default) : create the table or raise an error if it exists
  - `ignore`: same as `overwrite`, but does nothing if table already exists

We ask you to use the `overwrite` mode here. Also, you need to set the option `tempformat` to `csv` because the default Avro format does not allow non letter characters (such as `_`) in columns names.

In [0]:
# TODO: write your dataframe `df_clean` into Redshift, using the above parameters
# Note : only the first 20 lines of the df_clean dataframe have been written to AWS Redshift to prevent a long data transfer time,
# thus reducing costs.
### BEGIN STRIP ###
mode = "overwrite"
df_clean.limit(20).write.jdbc(url=REDSHIFT_PATH_FULL, table=REDSHIFT_TABLE, mode=mode, properties=properties)
### END STRIP ###