# Sparkify Data Lakes with Spark (on AWS EMR)
This notebook creates a data lake from data hosted in S3 and executes the necessary ETL instructions using Spark in an AWS EMR cluster.

In [None]:
import sys, os
from pathlib import Path
from pyspark.sql import DataFrame
from IPython.core import display as ICD

In [None]:
src_path: str = "../src"
sys.path.append(src_path)

In [None]:
from etl import main as run_etl
from utils import process_config, create_spark_session
from table_schemas import TABLES_SCHEMAS

In [None]:
user_config, dl_config = (
    process_config(Path(os.getcwd()).parent.joinpath("_user.cfg")),
    process_config(Path(os.getcwd()).parent.joinpath("dl.cfg")),
)
spark = create_spark_session(user_config, dl_config)

## 1. Extract, Transform and Load data (ETL)

In [None]:
run_etl()

## 2. Perform example queries

### 2.1. Get first 5 rows of every table

In [None]:
bucket_prefix = f"s3a://{dl_config.get('S3', 'DEST_BUCKET_NAME')}"

In [None]:
# doesn't work well for partitioned tables, needs to be fixed
for table_name, table_schema in TABLES_SCHEMAS.items():
    table_df = spark.read.parquet(f"{bucket_prefix}/{table_name}", schema=table_schema)
    n_elem = table_df.count()
    table_df_preview = spark.createDataFrame(
        table_df.take(5), schema=table_schema
    ).toPandas()

    print(f"First 5 rows of {table_name}:")
    ICD.display(table_df_preview)
    print(f"The full table contains a total of {n_elem} records")

### 2.2. Who are the top 5 users with the highest activity?

In [None]:
songplays_df = spark.read.parquet(
    f"{bucket_prefix}/songplays", schema=TABLES_SCHEMAS["songplays"]
)

In [None]:
df = spark.createDataFrame(
    songplays_df.groupBy("user_id").count().sort("count", ascending=False).take(5),
    schema=table_schema,
).toPandas()
ICD.display(df)