In [0]:
%run "./0.Config"

### 1. Ingest raw data

In [0]:
raw_data_df  = spark.read.csv(raw_data_path,inferSchema=True,header=True)

### `Data cleaning`

In [0]:
# Drop unused cols

unused_cols =('rank',
            'title',
            'channel_type',
            'video_views_rank',
            'country_rank',
            'channel_type_rank',
            'video_views_for_the_last_30_days',
            'subscribers_for_last_30_days',
            "Gross tertiary education enrollment (%)",
            'Population',
            'Unemployment rate',
            'Urban_population',
            'Latitude',
            'Longitude')

filtered_raw_data_df = raw_data_df.drop(*unused_cols)

### Create processed `Category` DF

In [0]:
raw_category_df = filtered_raw_data_df.dropDuplicates(['category']).select('category')

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
processed_category_df =raw_category_df.withColumn('cat_id',monotonically_increasing_id())
processed_category_df =processed_category_df.select('cat_id','category')

### Create processed `Country` DF

In [0]:
raw_country_df = filtered_raw_data_df.dropDuplicates(['Country','Abbreviation']).select('Country','Abbreviation')

In [0]:
processed_country_df = raw_country_df.withColumn('country_id',monotonically_increasing_id())
processed_country_df = processed_country_df.withColumnRenamed('Abbreviation','abbreviation')
processed_country_df = processed_country_df.withColumnRenamed('Country','country')
processed_country_df = processed_country_df.select('country_id','country','abbreviation')


### Create processed `Youtuber` DF

In [0]:
from pyspark.sql.functions import from_unixtime,unix_timestamp,to_timestamp,concat,lit,when,length,col

In [0]:
# Add timestamp column
raw_youtuber_df = filtered_raw_data_df.withColumn('created_month_MM',from_unixtime(unix_timestamp('created_month','MMM'),'MM'))
raw_youtuber_df = raw_youtuber_df.withColumn('created_date_DD',when(length('created_date') == 1,concat(lit('0'),'created_date')).otherwise(filtered_raw_data_df.created_date))
raw_youtuber_df = raw_youtuber_df.withColumn('created_day',\
                                           to_timestamp(concat('created_month_MM',lit('-'),'created_date_DD',lit('-'),'created_year'),"MM-dd-yyyy"))


In [0]:
raw_youtuber_df = raw_youtuber_df.withColumn('youtuber_id',monotonically_increasing_id())

raw_youtuber_df =raw_youtuber_df.withColumnRenamed("Youtuber","youtuber")
raw_youtuber_df =raw_youtuber_df.withColumnRenamed("video views","total_views")
raw_youtuber_df =raw_youtuber_df.withColumnRenamed("uploads","video_uploaded")
raw_youtuber_df =raw_youtuber_df.withColumnRenamed("Country","country")

drops= ['created_year','created_month','created_date','Abbreviation','created_date_DD','created_month_MM']

raw_youtuber_df = raw_youtuber_df.drop(*drops)

In [0]:
raw_youtuber_join_country_df = raw_youtuber_df.join(processed_country_df,['country'],'left')

In [0]:
raw_youtuber_join_cat_country_df = raw_youtuber_join_country_df.join(processed_category_df,['category'],'left')

In [0]:
cols = ['youtuber_id','youtuber','country_id','cat_id','subscribers','total_views','video_uploaded','lowest_monthly_earnings','highest_monthly_earnings','lowest_yearly_earnings','highest_yearly_earnings','created_day']
processed_youtuber_df = raw_youtuber_join_cat_country_df.select(*cols)

### Write processed DF to `Processed Delta lake`

In [0]:
processed_youtuber_df.write.format('delta').mode('overwrite').saveAsTable('global_youtube_statistics_2023_processed.youtuber')

processed_category_df.write.format('delta').mode('overwrite').saveAsTable('global_youtube_statistics_2023_processed.category')

processed_country_df.write.format('delta').mode('overwrite').saveAsTable('global_youtube_statistics_2023_processed.country')