#### Code executed into JupyterHub in Amazon EMR

In [None]:
from pyspark.sql.functions import isnan, when, count, col, from_unixtime, to_timestamp, to_date, udf
from pyspark.sql.types import DateType

#### Path S3 definitions

In [None]:
path_movies = 's3://datarocket-raw/movielens/movies.csv'
path_ratings = 's3://datarocket-raw/movielens/ratings.csv'
s3_path_output = 's3://datarocket-processed/outputs/'

##### Read files CSV

In [None]:
df_movies = spark.read.format("csv").option("header", "true").load(path_movies)

In [None]:
df_movies.show(n=5)

In [None]:
df_ratings = spark.read.format("csv").option("header", "true").load(path_ratings)

In [None]:
df_ratings.show(n=5)

In [None]:
#df_ratings = df_ratings.withColumnRenamed('movieId_rating')

#### Join datasets

In [None]:
df_final = df_movies.join(df_ratings, df_movies.movieId == df_ratings.movieId).select(df_movies.movieId, 'title', 'genres', 'userId', 'rating', 'timestamp')

In [None]:
df_final.show()

In [None]:
df_final.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_final.columns]).show()

In [None]:
df_final.dtypes

###### Convert timestamps to datetime with UDF

In [None]:
from datetime import datetime
def convert_timestamp_to_datetime(timestamp):
    return str(datetime.fromtimestamp(int(timestamp)))

In [None]:
udf_convert_timestamp_to_datetime = udf(convert_timestamp_to_datetime)

In [None]:
df_final = df_final.withColumn('date', udf_convert_timestamp_to_datetime(col('timestamp')))

In [None]:
df_final.withColumn('date', col('date').cast(DateType())).printSchema()

In [None]:
df_final = df_final.drop('timestamp')

In [None]:
df_final.columns

In [None]:
df_final.show()

###### Save file into S3

In [None]:
df_final.write.mode('append').parquet(s3_path_output)