In [None]:
# Remove previously created layers
# dbutils.fs.rm('dbfs:/FileStore/raw/topSubscribed', True)
# dbutils.fs.rm('dbfs:/FileStore/refined/channels', True)
# dbutils.fs.rm('dbfs:/FileStore/refined/subscriptions', True)
# dbutils.fs.rm('dbfs:/FileStore/refined/videos', True)
# dbutils.fs.rm('dbfs:/FileStore/refined/categories', True)
dbutils.fs.rm('dbfs:/FileStore/raw', True)
dbutils.fs.rm('dbfs:/FileStore/refined', True)

# Print success message
print('All previously created layers removed!')

In [None]:
# Import libraries
import numpy as np
import pandas as pd
import pyspark as ps
from pyspark.sql import functions as F

# Extraction

## Import csv file as pyspark dataframe

In [None]:
# Set options for spark .read()
path_to_csv = '/FileStore/tables/topSubscribed.csv'

# Read csv file to spark dataframe
df = spark\
    .read\
    .format('csv')\
    .option('inferSchema', 'false')\
    .option('header', 'true')\
    .option('sep', ',')\
    .load(path_to_csv)

# Display raw dataframe data
display(df)

# Print raw dataframe schema
df.printSchema()

## Save raw dataframe as a parquet file (1st layer)

In [None]:
# Save dataframe as parquet file
df.write.format('parquet')\
        .option('header', 'true')\
        .save('dbfs:/FileStore/raw/topSubscribed')

# Transformation

## Clean column headers

In [None]:
# Show dataframe column headers and types
df.printSchema()

In [None]:
# Create list with stripped, lowercased, snake-cased column headers
newColumns = [column.strip()\
                    .lower()\
                    .replace(' ', '_')
              for column
              in df.columns]

# Create dataframe with new column headers
df = df.toDF(*newColumns)

# Show dataframe with updated column headers
df.printSchema()

## Clean column values

In [None]:
# Show dataframe values
display(df)

In [None]:
# Create dataframe with clean values (trim left/right whitespace, lowercase STRs, remove commas from numbers)
df = df.withColumn('youtube_channel', F.trim(df['youtube_channel']))\
        .withColumn('youtube_channel', F.lower(F.col('youtube_channel')))\
        .withColumn('subscribers', F.translate('subscribers', ',', ''))\
        .withColumn('video_views', F.translate('video_views', ',', ''))\
        .withColumn('video_count', F.translate('video_count', ',', ''))\
        .withColumn('category', F.trim(df['category']))\
        .withColumn('category', F.lower(F.col('category')))\
        .withColumn('started', F.col('started'))

# Show dataframe with updated values
display(df)

## Update data types

In [None]:
# Show dataframe data types
df.printSchema()

In [None]:
# Create dataframe with updated data types
df = df.withColumn('rank', F.col('rank').cast('int'))\
        .withColumn('subscribers', F.col('subscribers').cast('int'))\
        .withColumn('video_views', F.col('video_views').cast('long'))\
        .withColumn('video_count', F.col('video_count').cast('int'))\
        .withColumn('started', F.col('started').cast('int'))

# Show dataframe with updated data types
df.printSchema()

## Partition dataframe into smaller dataframes

In [None]:
# List columns for new dataframes
df_chans_cols = ['rank', 'youtube_channel', 'started']
df_subs_cols = ['rank', 'subscribers']
df_vids_cols = ['rank', 'video_views', 'video_count']
df_cats_cols = ['rank', 'category']

# Create a 'channels' dataframe
df_chans = df.select(df_chans_cols)

# Create a 'subscriptions' dataframe
df_subs = df.select(df_subs_cols)

# Create a 'videos' dataframe
df_vids = df.select(df_vids_cols)

# Create a 'categories' dataframe
df_cats = df.select(df_cats_cols)

# Display new dataframes
display(df_chans)
display(df_subs)
display(df_vids)
display(df_cats)

## Save new, refined dataframes as parquet files (2nd layer)

In [None]:
# Save 'channels' dataframe
df_chans.write\
    .format('parquet')\
    .option('header', 'true')\
    .mode('overwrite')\
    .save('dbfs:/FileStore/refined/channels')

# Save 'subscriptions' dataframe
df_subs.write\
    .format('parquet')\
    .option('header', 'true')\
    .mode('overwrite')\
    .save('dbfs:/FileStore/refined/subscriptions')

# Save 'videos' dataframe
df_vids.write\
    .format('parquet')\
    .option('header', 'true')\
    .mode('overwrite')\
    .save('dbfs:/FileStore/refined/videos')

# Save 'categories' dataframe
df_cats.write\
    .format('parquet')\
    .option('header', 'true')\
    .mode('overwrite')\
    .save('dbfs:/FileStore/refined/categories')