# Create and load the data

In [0]:
# Create single DataFrame
def create_df(topic: str):
    '''
    Creates a dataframe from the data in the S3 bucket from a given topic.

    Parameters
    ----------
    topic: str
        The name of the Kafka topic, which coincides with the name of the directory inside bucket-name/topics.
    
    Returns
    -------
    df : pyspark.sql.dataframe.DataFrame
        The dataframe containing the data stored in the bucket.
    '''
    file_location = "/mnt/user-0a6a638f5991-mounted-bucket/topics/"+ topic + "/partition=0/*.json" 
    file_type = "json"
    # Ask Spark to infer the schema
    infer_schema = "true"
    # Read in JSONs from mounted S3 bucket
    df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(file_location)
    # Display Spark dataframe to check its content
    return df

In [0]:
# Using method above, create all three dataframes and return them
def load_dfs():
    '''
    Loads the three dataframee for the pin, geo, and user data.

    Returns
    -------
    df_pin : pyspark.sql.dataframe.DataFrame
        The Pinterest post data.
    df_geo : pyspark.sql.dataframe.DataFrame
        The Pinterest geolocation data.
    df_return : pyspark.sql.dataframe.DataFrame
        The Pinterest user data.
    '''
    df_pin = create_df("0a6a638f5991.pin")
    df_geo = create_df("0a6a638f5991.geo")
    df_user = create_df("0a6a638f5991.user")
    return df_pin, df_geo, df_user

In [0]:
########## IMPORTS ##########
from pyspark.sql.functions import regexp_replace
# from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.sql.functions import array
from pyspark.sql.functions import concat_ws
import pyspark
import multiprocessing

# Data cleaning

In [0]:
# Clean pin data
def clean_pin_data(pin_df: pyspark.sql.dataframe.DataFrame):
    '''
    Cleans the DataFrame that contains information about Pinterest posts.

    Parameters
    ----------
    pin_df : pyspark.sql.dataframe.DataFrame
        The DataFrame to be cleaned.
        The DataFrame must contain the columns: category, description, downloaded, follower_count, image_src, index, is_image_or_video, poster_name, save_location, tag_list, title, unique_id.
    
    Returns
    -------
    df : pyspark.sql.dataframe.DataFrame
        The cleaned Pinterest post data.
    '''
    df = pin_df.dropDuplicates().alias('df')
    #df = pin_df.alias('df')

    # replace several non-values with 'None' in different columns
    df = df  \
        .replace('User Info Error', None, ['follower_count', 'poster_name'])  \
        .replace('N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e', None, 'tag_list') \
        .replace('No description available Story format', None, 'description') \
        .replace('Image src error.', None, 'image_src') \
        .replace('No Title Data Available', None, 'title')

    # convert all 'k' and 'M' in column 'follower_count' to '000' and '000000' respectively
    df = df\
        .withColumn('follower_count', regexp_replace('follower_count', 'k', '000')) \
        .withColumn('follower_count', regexp_replace('follower_count', 'M', '000000'))

    # cast 'follower_count' column to integer
    df = df.withColumn('follower_count', df.follower_count.cast('int'))
    # cast index to int
    df = df.withColumn('index', df.index.cast('int'))

    # remove the "Local save in " bit in the save_location column so that only the path is given
    df = df\
        .withColumn('save_location', regexp_replace('save_location', 'Local save in ', ''))
    # rename column
    df = df.withColumnRenamed('index', 'ind')

    # reorder dataframe columns
    df = df.select('ind','unique_id','title','description','follower_count','poster_name','tag_list','is_image_or_video','image_src','save_location','category')

    return df


In [0]:
# Clean geo data
def clean_geo_data(geo_df: pyspark.sql.dataframe.DataFrame):
    '''
    Cleans the DataFrame that contains information about geolocation.

    Parameters
    ----------
    geo_df : pyspark.sql.dataframe.DataFrame
        The DataFrame to be cleaned.
        The DataFrame must contain the columns: country, ind, latitude, longitude, timestamp.
    
    Returns
    -------
    df : pyspark.sql.dataframe.DataFrame
        The cleaned geolocation data.
    '''
    df = geo_df.dropDuplicates().alias('df')
    #df = geo_df.alias('df')

    # cast latitude and longitude to 'float', then merge them into an array column called 'coordinates'
    df = df\
        .withColumn('latitude', df.latitude.cast('float'))\
        .withColumn('longitude', df.longitude.cast('float'))
    df = df\
        .withColumn('coordinates', array(df.latitude, df.longitude))
    # cast column named 'timestamp' to the type 'timestamp. cast columnd 'ind' to type 'int'
    df = df\
        .withColumn('timestamp', df.timestamp.cast('timestamp'))\
        .withColumn('ind',df.ind.cast('int'))
    # drop 'longitude' and 'latitude' columns and reorder the columns
    df = df\
        .select('ind', 'country', 'coordinates', 'timestamp')
    
    return df

In [0]:
# Clean user data
def clean_user_data(user_df: pyspark.sql.dataframe.DataFrame):
    '''
    Cleans the DataFrame that contains information about users.

    Parameters
    ----------
    geo_df : pyspark.sql.dataframe.DataFrame
        The DataFrame to be cleaned.
        The DataFrame must contain the columns: age, date_joined, first_name, ind, last_name.
    
    Returns
    -------
    df : pyspark.sql.dataframe.DataFrame
        The cleaned users data.
    '''

    df = user_df.dropDuplicates().alias('df')
    #df = user_df.alias('df')

    # create column 'user_name' made up by concatenating 'first_name' and 'last_name'
    df = df\
        .withColumn('user_name', concat_ws(' ', df.first_name, df.last_name))

    # cast 'ind' and 'age' to an 'int' type, and date_joined' to a 'timestamp' type.
    df = df\
        .withColumn('date_joined', df.date_joined.cast('timestamp'))\
        .withColumn('ind', df.ind.cast('int'))\
        .withColumn('age', df.age.cast('int'))

    # drop 'first_name' and 'last_name', and reorder the columns
    df = df\
        .select('ind', 'user_name', 'age', 'date_joined')

    return df

In [0]:
# Load the data, then clean it, and return the cleane data
def load_cleaned_data():
    df_pin, df_geo, df_user = load_dfs()
    df_pin = clean_pin_data(df_pin)
    df_geo = clean_geo_data(df_geo)
    df_user = clean_user_data(df_user)
    return df_pin, df_geo, df_user

# Queries

In [0]:
# Create session to run queries
cfg = (
    pyspark.SparkConf()
    # Setting the master to run locally and with the maximum amount of cpu coresfor multiprocessing.
    .setMaster(f"local[{multiprocessing.cpu_count()}]")
    # Setting application name
    .setAppName("TestApp")
    # Setting config value via string
    .set("spark.eventLog.enabled", False)
    # Setting environment variables for executors to use
    .setExecutorEnv(pairs=[("VAR3", "value3"), ("VAR4", "value4")])
    # Setting memory if this setting was not set previously
    .setIfMissing("spark.executor.memory", "1g")
)

session = pyspark.sql.SparkSession.builder.config(conf=cfg).getOrCreate()

In [0]:
# Load cleaned data, then create TempViews for the queries
df_pin, df_geo, df_user = load_cleaned_data()

# Create TempViews for queries
df_pin.createOrReplaceTempView("pin_table")
df_geo.createOrReplaceTempView("geo_table")
df_user.createOrReplaceTempView("user_table")

In [0]:
# The most popular category in each country (ties allowed. to disallow ties, use ROW_NUMBER instead of RANK)
query_popular_category_in_each_country = """
    WITH category_count_per_country AS
    (
        SELECT
            country,
            category,
            COUNT(*) AS category_count,
            RANK() OVER (
                PARTITION BY country ORDER BY COUNT(pin_table.ind) DESC
            ) AS rank
        FROM
            pin_table
        JOIN
            geo_table ON pin_table.ind = geo_table.ind
        GROUP BY
            country, category
    )
    SELECT
        country,
        category,
        category_count
    FROM
        category_count_per_country
    WHERE
        rank = 1
"""

popular_category_in_each_country_df = session.sql(query_popular_category_in_each_country)
popular_category_in_each_country_df.show()

In [0]:
# Most popular category each year (ties allowed. to disallow ties, use ROW_NUMBER instead of RANK)
query_popular_category_each_year = """
    WITH category_count_per_year AS (
    SELECT
        YEAR(timestamp) AS post_year,
        category,
        COUNT(*) AS category_count,
        RANK() OVER (
            PARTITION BY
                YEAR(timestamp)
            ORDER BY
                COUNT(pin_table.ind) DESC
        ) AS rank
    FROM
        pin_table
    JOIN
        geo_table ON pin_table.ind = geo_table.ind
    GROUP BY
        YEAR(timestamp), category
    )
    SELECT
        post_year,
        category,
        category_count
    FROM
        category_count_per_year
    WHERE
        rank = 1   
"""

popular_category_each_year_df = session.sql(query_popular_category_each_year)
popular_category_each_year_df.show()

In [0]:
# Most followers in each country (ties allowed. to disallow ties, use ROW_NUMBER instead of RANK)
query_most_followers_per_country = """
    WITH ranked_poster_country_followers_table AS (
    SELECT
        country,
        poster_name,
        follower_count,
        RANK() OVER (
            PARTITION BY
                country
            ORDER BY
                follower_count DESC
        ) AS rank
    FROM
        pin_table
    JOIN
        geo_table ON pin_table.ind = geo_table.ind
    GROUP BY
        poster_name, country, follower_count
    )
    SELECT
        country,
        poster_name,
        follower_count
    FROM
        ranked_poster_country_followers_table
    WHERE
        rank = 1
"""

most_followers_per_country_df = session.sql(query_most_followers_per_country)
most_followers_per_country_df.show()

# country with most followers
most_followers_per_country_df.createOrReplaceTempView("most_followers_per_country_table")
query_country_with_most_followers = """
    SELECT
        country,
        follower_count
    FROM
        most_followers_per_country_table
    ORDER BY
        follower_count DESC
    LIMIT 1
"""

country_with_most_followers_df = session.sql(query_country_with_most_followers)
country_with_most_followers_df.show()

In [0]:
# Most popular category for different age groups (ties allowed. to disallow ties, use ROW_NUMBER instead of RANK)
query_popular_category_per_age_group = """
    WITH age_group_category_table AS
    (
        SELECT
            category,
            CASE
                WHEN age BETWEEN 18 AND 24 THEN '18-24'
                WHEN age BETWEEN 25 AND 35 THEN '25-35'
                WHEN age BETWEEN 36 AND 50 THEN '36-50'
                ELSE '+50'
            END AS age_group
        FROM
            pin_table
        JOIN
            user_table ON pin_table.ind = user_table.ind
    ),
    ranked_age_group_category_table AS
    (
        SELECT
            age_group,
            category,
            COUNT(*) AS category_count,
            RANK() OVER (
                PARTITION BY
                    age_group
                ORDER BY
                    COUNT(*) DESC
            ) AS rank
        FROM
            age_group_category_table
        GROUP BY
            age_group, category
    )
    SELECT
        age_group,
        category,
        category_count
    FROM
        ranked_age_group_category_table
    WHERE
        rank = 1
"""

popular_category_per_age_group_df = session.sql(query_popular_category_per_age_group)
popular_category_per_age_group_df.show()

In [0]:
# Median follower count for different age groups
query_median_follower_count_per_age_group = """
    WITH age_group_follower_count_table AS
    (
        SELECT
            follower_count,
            CASE
                WHEN age BETWEEN 18 AND 24 THEN '18-24'
                WHEN age BETWEEN 25 AND 35 THEN '25-35'
                WHEN age BETWEEN 36 AND 50 THEN '36-50'
                ELSE '+50'
            END AS age_group
        FROM
            pin_table
        JOIN
            user_table ON pin_table.ind = user_table.ind
    )
    SELECT
        age_group,
        PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY follower_count) AS median_follower_count
    FROM
        age_group_follower_count_table
    GROUP BY
        age_group
    ORDER BY
        CASE
            WHEN age_group = '18-24' THEN 10
            WHEN age_group = '25-35' THEN 20
            WHEN age_group = '36-50' THEN 30
            ELSE 100
        END
"""

median_follower_count_per_age_group_df = session.sql(query_median_follower_count_per_age_group)
median_follower_count_per_age_group_df.show()

In [0]:
# Number of users joined each year between 2015-2020
query_users_joined_per_year = """
    SELECT
        YEAR(date_joined) AS post_year,
        COUNT(*) AS number_users_joined
    FROM
        user_table
    GROUP BY
        YEAR(date_joined)
    HAVING
        YEAR(date_joined) BETWEEN 2015 AND 2020
    ORDER BY
        post_year
"""

query_users_joined_per_year_df = session.sql(query_users_joined_per_year)
query_users_joined_per_year_df.show()

In [0]:
# Median follower count of users based on their joining year.
query_median_follower_count_per_joining_year = """
    SELECT
        YEAR(date_joined) AS year_joined,
        PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY follower_count) AS median_follower_count
    FROM
        user_table
    JOIN
        pin_data ON user_table.ind = pin_data.ind
    GROUP BY
        YEAR(date_joined)
    HAVING
        YEAR(date_joined) BETWEEN 2015 and 2020
    ORDER BY
        year_joined
"""

median_follower_count_per_joining_year_df = session.sql(query_median_follower_count_per_joining_year)
median_follower_count_per_joining_year_df.show()

In [0]:
# Median follower count of users based on joining year and age group
query_median_follower_count_per_age_group_and_joining_year = """
    WITH follower_count_age_group_post_year_table AS
    (
        SELECT
            CASE
                WHEN age BETWEEN 18 AND 24 THEN '18-24'
                WHEN age BETWEEN 25 AND 35 THEN '25-35'
                WHEN age BETWEEN 36 AND 50 THEN '36-50'
                ELSE '+50' 
            END AS age_group,
            YEAR(timestamp) AS post_year,
            follower_count
        FROM
            pin_table
        JOIN
            geo_table ON pin_table.ind = geo_table.ind
        JOIN
            user_table ON pin_table.ind = user_table.ind
    )
    SELECT
        age_group,
        post_year,
        PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY follower_count) AS median_follower_count
    FROM
        follower_count_age_group_post_year_table
    GROUP BY
        age_group, post_year
    ORDER BY
        CASE
            WHEN age_group = '18-24' THEN 10
            WHEN age_group = '25-35' THEN 20
            WHEN age_group = '36-50' THEN 30
            ELSE 100
        END,
        post_year
"""

median_follower_count_per_age_group_and_joining_year_df = session.sql(query_median_follower_count_per_age_group_and_joining_year)
median_follower_count_per_age_group_and_joining_year_df.show()

# DataFrame schema for reference

In [0]:
# Pin data
df_pin.printSchema()

In [0]:
# Geolocation data
df_geo.printSchema()

In [0]:
# User data
df_user.printSchema()