## Experiment on IMDB Data

### Load Libraries

In [1]:
import findspark
findspark.init()
import time
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

### Configurations

In [2]:
def conf2_setup():
    conf = SparkConf() \
        .set('spark.driver.memory', '20G') \
        .set('spark.sql.shuffle.partitions', '16') \
        .set('spark.sql.adaptive.enabled', 'false') \
        
    spark_session = SparkSession\
        .builder\
        .master('local[8]')\
        .config(conf = conf)\
        .appName("conf2") \
        .getOrCreate()
    
    url = spark.sparkContext.uiWebUrl
    print("Spark web UI: ", url)
    
    return spark_session

In [3]:
def conf3_setup():
    conf = SparkConf() \
        .set('spark.driver.memory', '20G') \
        .set('spark.sql.shuffle.partitions', '16') \
        .set('spark.sql.autoBroadcastJoinThreshold', '-1') \
        .set('spark.sql.adaptive.enabled', 'true') \
        .set("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .set("spark.sql.adaptive.skewJoin.enabled", "true") \
        
    spark_session = SparkSession\
        .builder\
        .master('local[8]')\
        .config(conf = conf)\
        .appName("conf3") \
        .getOrCreate()
    
    url = spark.sparkContext.uiWebUrl
    print("Spark web UI: ", url)
    
    return spark_session

In [4]:
def conf5_setup():
    conf = SparkConf() \
        .set('spark.driver.memory', '20G') \
        .set('spark.sql.shuffle.partitions', '16') \
        .set('spark.sql.autoBroadcastJoinThreshold', '-1') \
        .set('spark.sql.adaptive.enabled', 'true') \
        .set("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .set("spark.sql.adaptive.skewJoin.enabled", "true") \
        .set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3")
        
    spark_session = SparkSession\
        .builder\
        .master('local[8]')\
        .config(conf = conf)\
        .appName("conf5") \
        .getOrCreate()
    
    url = spark.sparkContext.uiWebUrl
    print("Spark web UI: ", url)
    
    return spark_session

### Queries

In [5]:
def conf2_query9():
    # set up spark session
    spark = conf2_setup()

    # read data
    principals = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/title.principals.tsv")
    
    names = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/name.basics.tsv")
    
    # manipulate data to make it more skew
    top4000 = principals\
    .groupBy('nconst')\
    .agg(F.count(F.lit(1)).alias("num_rows")).sort(F.col("num_rows").desc())\
    .select('nconst')\
    .limit(4000)
    
    name_to_change = top4000.rdd.map(lambda x: x[0]).collect()
    
    
    # manipulate data to make it more skew
    principals = principals.withColumn('nconst',
                   F.when(F.col('nconst').isin(name_to_change), F.lit("nm0914844"))
                   .otherwise(F.col('nconst')))
    
    """
    # check data skewness 
    principals\
        .groupBy('nconst')\
        .agg(F.count(F.lit(1)).alias("num_rows")).sort(F.col("num_rows").desc())\
        .show(truncate=False, n = 30)
    """
    
    # join two tables and group by
    principals.join(names, principals['nconst'] == names['nconst'], 'inner')\
        .groupBy("birthYear") \
        .agg(F.avg(F.length('primaryName')).alias("avg_name_length")) \
        .show(truncate = False, n = 500)

In [6]:
def conf2_query10():
    # set up spark session
    spark = conf2_setup()

    # read data
    principals = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/title.principals.tsv")
    
    names = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/name.basics.tsv")
    
    # manipulate data to make it more skew
    top4000 = principals\
    .groupBy('nconst')\
    .agg(F.count(F.lit(1)).alias("num_rows")).sort(F.col("num_rows").desc())\
    .select('nconst')\
    .limit(4000)
    
    name_to_change = top4000.rdd.map(lambda x: x[0]).collect()
    
    # manipulate data to make it more skew
    principals = principals.withColumn('nconst',
                   F.when(F.col('nconst').isin(name_to_change), F.lit("nm0914844"))
                   .otherwise(F.col('nconst')))
    # key salting
    salting_n = 5

    principals = principals\
        .withColumn("salt_key", F.floor(F.rand(222)*(salting_n-1)))

    names = names \
        .withColumn('key2', F.array([F.lit(num) for num in range(0, salting_n)]))\
        .withColumn('key2', F.explode(F.col('key2')))
    
    # join two tables and group by
    principals.join(names, (principals['nconst'] == names['nconst']) & (principals['salt_key'] == names['key2']), 
                           'inner')\
        .groupBy("birthYear") \
        .agg(F.avg(F.length('primaryName')).alias("avg_name_length")) \
        .show(truncate = False, n = 500)

In [7]:
def conf3_query11():
    # set up spark session
    spark = conf3_setup()

    # read data
    principals = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/title.principals.tsv")
    
    names = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/name.basics.tsv")
    
    # manipulate data to make it more skew
    top4000 = principals\
    .groupBy('nconst')\
    .agg(F.count(F.lit(1)).alias("num_rows")).sort(F.col("num_rows").desc())\
    .select('nconst')\
    .limit(4000)
    
    name_to_change = top4000.rdd.map(lambda x: x[0]).collect()
    
    # manipulate data to make it more skew
    principals = principals.withColumn('nconst',
                   F.when(F.col('nconst').isin(name_to_change), F.lit("nm0914844"))
                   .otherwise(F.col('nconst')))
    
    # join two tables and group by
    principals.join(names, principals['nconst'] == names['nconst'], 'inner')\
        .groupBy("birthYear") \
        .agg(F.avg(F.length('primaryName')).alias("avg_name_length")) \
        .show(truncate = False, n = 500)

In [8]:
def conf5_query12():
    # set up spark session
    spark = conf5_setup()

    # read data
    principals = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/title.principals.tsv")
    
    names = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .option('inferSchema','true')\
        .load("file:///Users/jiashu/Documents/UW_CSE544/project/experiments_finals/data/data2/name.basics.tsv")
    
    # manipulate data to make it more skew
    top4000 = principals\
    .groupBy('nconst')\
    .agg(F.count(F.lit(1)).alias("num_rows")).sort(F.col("num_rows").desc())\
    .select('nconst')\
    .limit(4000)
    
    name_to_change = top4000.rdd.map(lambda x: x[0]).collect()
    
    # manipulate data to make it more skew
    principals = principals.withColumn('nconst',
                   F.when(F.col('nconst').isin(name_to_change), F.lit("nm0914844"))
                   .otherwise(F.col('nconst')))
    
    # join two tables and group by
    principals.join(names, principals['nconst'] == names['nconst'], 'inner')\
        .groupBy("birthYear") \
        .agg(F.avg(F.length('primaryName')).alias("avg_name_length")) \
        .show(truncate = False, n = 500)

In [11]:
if __name__ == '__main__':
    start_time = time.time()
    
    # conf2_query9()
    # conf2_query10()
    
    conf3_query11()
    # conf5_query12()
    
    end_time = time.time()
    print(f"Elapsed_time: {end_time - start_time} seconds")

ConnectionRefusedError: [Errno 61] Connection refused