In [0]:
%sql
USE pinterest_data_9105411ea84a;

-- Injesting raw data and creating temp views

CREATE OR REPLACE TEMPORARY VIEW raw_df_pin
USING json
OPTIONS (
  path "s3://user-9105411ea84a-bucket/topics/9105411ea84a.pin/partition=0/"
);

CREATE OR REPLACE TEMPORARY VIEW raw_df_geo
USING json
OPTIONS (
  path "s3://user-9105411ea84a-bucket/topics/9105411ea84a.geo/partition=0/"
);

CREATE OR REPLACE TEMPORARY VIEW raw_df_user
USING json
OPTIONS (
  path "s3://user-9105411ea84a-bucket/topics/9105411ea84a.user/partition=0/"
);


In [0]:
%sql

--Cleaning raw data

--df_pin

CREATE OR REPLACE TEMPORARY VIEW clean_df_pin AS
SELECT
  -- Renaming "index" to "ind"
  index AS ind,
  
  -- Cleaning and transforming string columns
  CASE WHEN TRIM(unique_id) = '' THEN NULL ELSE unique_id END AS unique_id,
  CASE WHEN TRIM(title) = '' THEN NULL ELSE title END AS title,
  CASE WHEN TRIM(description) = '' THEN NULL ELSE description END AS description,
  
  -- Cleaning follower_count
  CASE 
    WHEN TRIM(follower_count) = '' OR follower_count IS NULL THEN NULL
    WHEN TRIM(follower_count) RLIKE '^[0-9]+$' THEN CAST(follower_count AS INT)  -- Already a number
    WHEN TRIM(follower_count) LIKE '%M' THEN CAST(REPLACE(follower_count, 'M', '') AS FLOAT) * 1000000
    WHEN TRIM(follower_count) LIKE '%k' THEN CAST(REPLACE(follower_count, 'k', '') AS FLOAT) * 1000
    ELSE NULL
  END AS follower_count,
  
  CASE WHEN TRIM(poster_name) = '' THEN NULL ELSE poster_name END AS poster_name,
  CASE WHEN TRIM(tag_list) = '' THEN NULL ELSE tag_list END AS tag_list,
  CASE WHEN TRIM(is_image_or_video) = '' THEN NULL ELSE is_image_or_video END AS is_image_or_video,
  
  -- Treat empty values as NULL
  CASE WHEN TRIM(img_src) = '' THEN NULL ELSE img_src END AS img_src,
  
  -- Clean save_location by removing everything before the first "/"
  CASE 
    WHEN TRIM(save_location) = '' THEN NULL 
    ELSE regexp_extract(TRIM(save_location), '(/.*)$', 1)
  END AS save_location,
  
  CASE WHEN TRIM(category) = '' THEN NULL ELSE category END AS category

FROM raw_df_pin;



--df_geo

CREATE OR REPLACE TEMPORARY VIEW clean_df_geo AS
SELECT
  ind,
  country,
  -- Create a new column 'coordinates' by concatenating latitude and longitude with a comma separator.
  CONCAT(CAST(latitude AS STRING), ',', CAST(longitude AS STRING)) AS coordinates,

  -- Convert timestamp to TIMESTAMP (if it isn't already)
  CAST(timestamp AS TIMESTAMP) AS timestamp
FROM raw_df_geo;


--df_user

CREATE OR REPLACE TEMPORARY VIEW clean_df_user AS
SELECT
  ind,
  -- Create a new column 'user_name' by concatenating first_name and last_name with a space inbetween
  CONCAT(first_name, ' ', last_name) AS user_name,
  age,
  CAST(date_joined AS TIMESTAMP) AS date_joined
FROM raw_df_user;

In [0]:
%sql

--Write clean data to the tables

INSERT OVERWRITE TABLE df_pin
SELECT * FROM clean_df_pin;


INSERT OVERWRITE TABLE df_geo
SELECT * FROM clean_df_geo;


INSERT OVERWRITE TABLE df_user
SELECT * FROM clean_df_user;
