# Instalando pacotes e libs

In [34]:
!pip install pandas
!pip install pyarrow
!pip install s3fs
!pip install simplejson

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [35]:
import urllib.request
from zipfile import ZipFile
import pandas as pd
import os

# Pegando account name para criar o nome do bucket de forma única todas as vezes

In [36]:
import simplejson
with open('/opt/ml/metadata/resource-metadata.json') as fh:
    metadata = simplejson.loads(fh.read())
accountid = metadata['ResourceArn'].split(':')[4]

%set_env accountid={accountid}
%set_env bucket_name=lab-{accountid}

env: accountid=877757629758
env: bucket_name=lab-877757629758


# Criando bucket para o Datalake

In [37]:
%%bash
if [[ -z $(aws s3api head-bucket --bucket $bucket_name) ]]; then
        echo "bucket exists. deleting bucket..."
        aws s3 rb s3://$bucket_name --force 
else
        echo "bucket does not exist. creating bucket..."
fi

aws s3 mb s3://$bucket_name --region us-east-2
aws s3 ls s3://$bucket_name/

bucket exists. deleting bucket...
make_bucket: lab-877757629758



An error occurred (400) when calling the HeadBucket operation: Bad Request
fatal error: An error occurred (NoSuchBucket) when calling the ListObjectsV2 operation: The specified bucket does not exist

remove_bucket failed: Unable to delete all objects in the bucket, bucket will not be deleted.


# Criando pastas no bucket

In [38]:
%%bash
aws s3api put-object --bucket $bucket_name --key data/landing/
aws s3api put-object --bucket $bucket_name --key data/staging/
aws s3api put-object --bucket $bucket_name --key data/analytics/

{
    "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\""
}
{
    "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\""
}
{
    "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\""
}


# Baixando dataset

In [39]:
print("downloading file from movielens website...")
urllib.request.urlretrieve(
        'http://files.grouplens.org/datasets/movielens/ml-1m.zip',
        '/tmp/ml-1m.zip')

downloading file from movielens website...


('/tmp/ml-1m.zip', <http.client.HTTPMessage at 0x7f892852c630>)

In [40]:
print("extracting dataset into tmp folder...")
with ZipFile('/tmp/ml-1m.zip', 'r') as zipObj:
   zipObj.extractall('/tmp/')

extracting dataset into tmp folder...


# Realizando ETL e upload dos datasets

In [41]:
import datetime

x = datetime.datetime.now()
etl_date = x.strftime("%Y%m%d_%H%M%S")
print(etl_date) 
%set_env etl_date={etl_date}

20200727_221815
env: etl_date=20200727_221815


In [42]:
%%bash
aws s3 cp /tmp/ml-1m/movies.dat s3://$bucket_name/data/landing/movies/movies_$etl_date.dat
aws s3 cp /tmp/ml-1m/ratings.dat s3://$bucket_name/data/landing/ratings/ratings_$etl_date.dat

upload: ../../../../tmp/ml-1m/movies.dat to s3://lab-877757629758/data/landing/movies/movies_20200727_221815.dat
upload: ../../../../tmp/ml-1m/ratings.dat to s3://lab-877757629758/data/landing/ratings/ratings_20200727_221815.dat


In [43]:
print("reading csv files...")
movies_df = pd.read_csv("/tmp/ml-1m/movies.dat", "::", 
                        engine='python', 
                        header=None, 
                        names=['movieid', 'title', 'genres']) 
print("movies_df has %s lines" % movies_df.shape[0])

reading csv files...
movies_df has 3883 lines


In [44]:
ratings_df = pd.read_csv("/tmp/ml-1m/ratings.dat", "::", 
                         engine='python', 
                         header=None, 
                         names=['userid', 'movieid', 'rating', 'timestamp']) 
print("ratings_df has %s lines" % ratings_df.shape[0])

ratings_df has 1000209 lines


In [45]:
print("merging dataframes...")
merged_df = pd.merge(movies_df, ratings_df, on='movieid')

merging dataframes...


In [46]:
print("aggregating data...")
aggregation_df = merged_df.groupby('title').agg({'rating': ['count', 'mean']})
aggregation_df.columns = aggregation_df.columns.droplevel(level=0)
aggregation_df = aggregation_df.rename(columns={
    "count": "rating_count", "mean": "rating_mean"
})

aggregating data...


In [47]:
print("sorting data...")
aggregation_df = aggregation_df.sort_values(
        'rating_mean', 
        ascending=False).loc[aggregation_df['rating_count'] > 1000].head()

sorting data...


In [48]:
print("writing files to s3...")

movies_df.to_parquet(
        "s3://" + 
        os.getenv('bucket_name') + 
        "/data/analytics/movies/movies_" + 
        etl_date + 
        ".parquet.snappy")

ratings_df.to_parquet(
        "s3://" + 
        os.getenv('bucket_name') + 
        "/data/analytics/ratings/ratings_" +
        etl_date + 
        ".parquet.snappy")

aggregation_df.to_parquet(
        "s3://" + 
        os.getenv('bucket_name') + 
        "/data/analytics/best_movies/best_movies_" +
        etl_date + 
        ".parquet.snappy")

writing files to s3...


# Fazendo leitura dos dados

In [49]:
print("reading file from s3 and printing result...")
result_df = pd.read_parquet(
        "s3://" + 
        os.getenv('bucket_name') + 
        "/data/analytics/best_movies/best_movies_" + etl_date + ".parquet.snappy")
print("result_df has %s lines" % result_df.size)

reading file from s3 and printing result...
result_df has 10 lines
