In [1]:
!pip install pandas
!pip install pyarrow
!pip install s3fs

Collecting pandas
  Using cached https://files.pythonhosted.org/packages/52/3f/f6a428599e0d4497e1595030965b5ba455fd8ade6e977e3c819973c4b41d/pandas-0.25.3-cp36-cp36m-manylinux1_x86_64.whl
Collecting numpy>=1.13.3
  Using cached https://files.pythonhosted.org/packages/d2/ab/43e678759326f728de861edbef34b8e2ad1b1490505f20e0d1f0716c3bf4/numpy-1.17.4-cp36-cp36m-manylinux1_x86_64.whl
Collecting pytz>=2017.2
  Using cached https://files.pythonhosted.org/packages/e7/f9/f0b53f88060247251bf481fa6ea62cd0d25bf1b11a87888e53ce5b7c8ad2/pytz-2019.3-py2.py3-none-any.whl
Installing collected packages: numpy, pytz, pandas
Successfully installed numpy-1.17.4 pandas-0.25.3 pytz-2019.3
Collecting pyarrow
  Using cached https://files.pythonhosted.org/packages/6c/32/ce1926f05679ea5448fd3b98fbd9419d8c7a65f87d1a12ee5fb9577e3a8e/pyarrow-0.15.1-cp36-cp36m-manylinux2010_x86_64.whl
Installing collected packages: pyarrow
Successfully installed pyarrow-0.15.1
Collecting s3fs
  Using cached https://files.pythonhosted.o

In [2]:
import urllib.request
from zipfile import ZipFile
import pandas as pd

In [3]:
bucket = "angelo-datalake"

# download MovieLens 1M Dataset
print("downloading file from movielens website...")
urllib.request.urlretrieve(
        'http://files.grouplens.org/datasets/movielens/ml-1m.zip',
        '/tmp/ml-1m.zip')

downloading file from movielens website...


('/tmp/ml-1m.zip', <http.client.HTTPMessage at 0x7f85f1522c18>)

In [4]:
# extract the zip file
print("extracting dataset into tmp folder...")
with ZipFile('/tmp/ml-1m.zip', 'r') as zipObj:
   zipObj.extractall('/tmp/')

extracting dataset into tmp folder...


In [5]:
# read the csv
print("reading csv files...")
movies_df = pd.read_csv("/tmp/ml-1m/movies.dat", "::", 
                        engine='python', 
                        header=None, 
                        names=['movieid', 'title', 'genres']) 
print("movies_df has %s lines" % movies_df.shape[0])
ratings_df = pd.read_csv("/tmp/ml-1m/ratings.dat", "::", 
                         engine='python', 
                         header=None, 
                         names=['userid', 'movieid', 'rating', 'timestamp']) 
print("ratings_df has %s lines" % ratings_df.shape[0])

reading csv files...
movies_df has 3883 lines
ratings_df has 1000209 lines


In [6]:
# join both dataframes
print("merging dataframes...")
merged_df = pd.merge(movies_df, ratings_df, on='movieid')

# aggregate data from dataframes, counting votes...
print("aggregating data...")
aggregation_df = merged_df.groupby('title').agg({'rating': ['count', 'mean']})
aggregation_df.columns = aggregation_df.columns.droplevel(level=0)
aggregation_df = aggregation_df.rename(columns={
    "count": "rating_count", "mean": "rating_mean"
})

merging dataframes...
aggregating data...


In [7]:
# sorting data and filtering only movies with more than 1000 votes...
print("sorting data...")
aggregation_df = aggregation_df.sort_values(
        'rating_mean', 
        ascending=False).loc[aggregation_df['rating_count'] > 1000].head()

sorting data...


In [8]:
# writing data...
print("writing file to s3...")
aggregation_df.to_parquet(
        "s3://" + 
        bucket + 
        "/data/processed/best_movies/best_movies.parquet.snappy")

writing file to s3...


In [9]:
# reading data...
print("reading file from s3 and printing result...")
result_df = pd.read_parquet(
        "s3://" + 
        bucket + 
        "/data/processed/best_movies/best_movies.parquet.snappy")
print("result_df has %s lines" % result_df.size)

reading file from s3 and printing result...
result_df has 10 lines


In [10]:
print("Best rated movie is: ")
print(result_df[0:1])

Best rated movie is: 
                                  rating_count  rating_mean
title                                                      
Shawshank Redemption, The (1994)          2227     4.554558
