#Part 1

In [0]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[?25l[K     |                                | 10 kB 30.2 MB/s eta 0:00:11[K     |                                | 20 kB 36.8 MB/s eta 0:00:09[K     |                                | 30 kB 47.6 MB/s eta 0:00:07[K     |                                | 40 kB 55.9 MB/s eta 0:00:06[K     |                                | 51 kB 61.9 MB/s eta 0:00:06[K     |                                | 61 kB 68.0 MB/s eta 0:00:05[K     |                                | 71 kB 46.4 MB/s eta 0:00:07[K     |                                | 81 kB 44.7 MB/s eta 0:00:07[K     |                                | 92 kB 44.9 MB/s eta 0:00:07[K     |                                | 102 kB 42.6 MB/s eta 0:00:08[K     |                                | 112 kB 42.6 MB/s eta 0:00:08[K     |                                | 122 kB 42.6 MB/s eta 0:00:08[K     |                                | 133 kB 42.6 MB/s eta 0:00:08

###Reading the Data

In [0]:
import os
import findspark
findspark.init()
from pyspark.sql.types import *
from pyspark.sql.functions import *
 
from pyspark import SparkContext
from pyspark.sql import SparkSession

def init_spark(app_name: str):
 spark = SparkSession.builder.appName(app_name).getOrCreate()
 sc = spark.sparkContext
 return spark, sc

spark = SparkSession.builder.appName("my project 1").getOrCreate()
sc = spark.sparkContext
sc
 
# Read a CSV into a dataframe
def load_PD_file(filename_or_dir, schema) :
    dataPath = "/mnt/ddscoursedatastorage/fwm-stb-data/" + filename_or_dir
    df = spark.read.format("csv")\
      .option("header","false")\
      .option("delimiter", "|")\
      .schema(schema)\
      .load(dataPath)
    return df

# Reading the Reference Parquet files
 
ref_data_df = spark.read.parquet('/ref_data_raw').withColumnRenamed("_device-id","device_id")\
                                                .withColumnRenamed("_dma","dma")\
                                                .withColumnRenamed("_dma-code","dma_code")\
                                                .withColumnRenamed("_household-id","household_id")\
                                                .withColumnRenamed("_household-type","household_type")\
                                                .withColumnRenamed("_system-type","system_type")\
                                                .withColumnRenamed("_zipcode","zipcode")

# Reading the Daily Programs CSV file
 
daily_prog_schema =  StructType([StructField('prog_code',StringType()),
                     StructField('title',StringType()),
                     StructField('genre',StringType()),
                     StructField('air_date',StringType()),
                     StructField('air_time',StringType()),
                     StructField('Duration',FloatType())
                                       ])
program_data_df = load_PD_file("Daily program data/" , daily_prog_schema  )

 
# Reading the 2.5% sample of the viewing data from a Parquet file
 
viewing_data_df = spark.read.parquet('/sample_viewing_2_5percent')
 
 
# Reading the Demographic CSV file
 
demographic_schema =  StructType([StructField('household_id',StringType()),
                      StructField('household_size',IntegerType()),
                      StructField('num_adults',IntegerType()),
                      StructField('num_generations',IntegerType()),
                      StructField('adult_range',StringType()),
                      StructField('marital_status',StringType()),
                      StructField('race_code',StringType()),
                      StructField('presence_children',StringType()),
                      StructField('num_children',IntegerType()),
                      StructField('age_children',StringType()), #format like range - 'bitwise'
                      StructField('age_range_children',StringType()),
                      StructField('dwelling_type',StringType()),
                      StructField('home_owner_status',StringType()),
                      StructField('length_residence',IntegerType()),
                      StructField('home_market_value',StringType()),
                      StructField('num_vehicles',IntegerType()),
                      StructField('vehicle_make',StringType()),
                      StructField('vehicle_model',StringType()),
                      StructField('vehicle_year',IntegerType()),
                      StructField('net_worth',IntegerType()),
                      StructField('income',StringType()),
                      StructField('gender_individual',StringType()),
                      StructField('age_individual',IntegerType()),
                      StructField('education_highest',StringType()),
                      StructField('occupation_highest',StringType()),
                      StructField('education_1',StringType()),
                      StructField('occupation_1',StringType()),
                      StructField('age_2',IntegerType()),
                      StructField('education_2',StringType()),
                      StructField('occupation_2',StringType()),
                      StructField('age_3',IntegerType()),
                      StructField('education_3',StringType()),
                      StructField('occupation_3',StringType()),
                      StructField('age_4',IntegerType()),
                      StructField('education_4',StringType()),
                      StructField('occupation_4',StringType()),
                      StructField('age_5',IntegerType()),
                      StructField('education_5',StringType()),
                      StructField('occupation_5',StringType()),
                      StructField('polit_party_regist',StringType()),
                      StructField('polit_party_input',StringType()),
                      StructField('household_clusters',StringType()),
                      StructField('insurance_groups',StringType()),
                      StructField('financial_groups',StringType()),
                      StructField('green_living',StringType())
                                       ])
 
demographic_data_df = load_PD_file("demographic/" , demographic_schema  )


#Part 1

##1.1

####preprocessing

In [0]:
# Preprocess and transform the Reference Data
program_data_df_2 = program_data_df.drop('title')
ref_data_df_2 = ref_data_df.select("household_id", "device_id", "dma").na.drop().dropDuplicates()
viewing_data_df_2 = viewing_data_df.select("device_id", "event_date", "prog_code")
demographic_data_df_2 = demographic_data_df.select("household_id", "household_size", "num_adults", "net_worth", "income").distinct()

####demographic-Data dataframe Transformations

In [0]:
demographic_data_df_2.cache() #caching demographic

#fixing values
mapping = {'A': 10, 'B': 11, 'C': 12, 'D': 13}

demographic_data_df_2 = demographic_data_df_2.withColumn('income', 
                                when(col('income') == 'A', mapping['A'])
                               .when(col('income') == 'B', mapping['B'])
                               .when(col('income') == 'C', mapping['C'])
                               .when(col('income') == 'D', mapping['D'])
                               .otherwise(col('income')))

#getting rid of unnecessary records
demographic_data_df_2 = demographic_data_df_2.filter((col('num_adults').isNotNull()) |
                                                (col('income').isNotNull()) | 
                                                (col('net_worth').isNotNull()))

#flagging families with less than 3 adults and networth higher than 8
demographic_data_df_2 = demographic_data_df_2.withColumn('cond_3', \
                                                     when(((col('num_adults') < 3) & (col('num_adults').isNotNull())) & (col('net_worth') > 8) & (col('net_worth').isNotNull()), 1).otherwise(0)).drop('num_adults', 'net_worth')

#finding average income
avg_income = demographic_data_df_2.filter((col('income') != 'null') & (col('income').isNotNull())).agg(avg('income').alias('avg_income')).first()['avg_income']
demographic_data_df_2 = demographic_data_df_2.withColumn('lower_than_avg',
                                                     when((col('income') < avg_income) & (col('income').isNotNull()), 1)
                                                     .otherwise(0)).drop('income')

# Flagging large families
demographic_data_df_2 = demographic_data_df_2.withColumn('large_family', when((col('household_size') >= 8) & (col('household_size').isNotNull()), 1).otherwise(0)).drop('household_size').distinct()

demographic_data_df_2.unpersist() #unpersisting

Out[16]: DataFrame[household_id: string, cond_3: int, lower_than_avg: int, large_family: int]

####Viewing-Data dataframe transformation

In [0]:
viewing_data_df_2.cache() #cachine viewing datafrane

#counting daily events per device
viewing_with_daily_events_df = viewing_data_df_2.groupBy('device_id', 'event_date').agg(count('*').alias('num_daily_events'))
viewing_with_daily_events_df = viewing_with_daily_events_df.groupBy('device_id').agg(sum('num_daily_events').alias('total_events'))

#counting number of active days per device
viewing_data_with_num_dates_df = viewing_data_df_2.select('device_id', 'event_date').distinct().groupBy('device_id').agg(count('event_date').alias('num_days'))

#adding data to viewing data frame
viewing_data_df_2 = viewing_data_df_2.join(viewing_with_daily_events_df, on='device_id', how='left')
viewing_data_df_2 = viewing_data_df_2.join(viewing_data_with_num_dates_df, on='device_id', how='left').drop('event_date').distinct()

#finding average daily activity per device
viewing_data_df_2 = viewing_data_df_2.withColumn('avg_daily_events', col('total_events')/col('num_days'))

#flagging devices that average less than 5 events per day
viewing_data_df_2 = viewing_data_df_2.withColumn('cond_1', when(col('avg_daily_events') < 5, 1).otherwise(0)).drop('avg_daily_events', 'total_events', 'num_days')

viewing_data_df_2.unpersist()  # Unpersist the DataFrame

Out[17]: DataFrame[device_id: string, prog_code: string, cond_1: int]

####Reference-Data dataframe transformation

In [0]:
ref_data_df_2.cache()  # Cache the DataFrame

#flagging devices who's dma contains 'z'
ref_data_df_2 = ref_data_df_2.withColumn('cond_2', when((col('dma').like('%z%')) | (col('dma').like('%Z%')), 1).otherwise(0)).drop('dma')

# Counting devices per household
household_devices_df = ref_data_df_2.select('household_id', 'device_id').groupBy('household_id').agg(countDistinct('device_id').alias('num_devices'))

#adding num_devices to reference data dataframe
ref_data_df_2 = ref_data_df_2.join(household_devices_df, on='household_id', how="left")

# Flagging houses with more than 3 devices
ref_data_df_2 = ref_data_df_2.withColumn('more_than_3_dev', when(col('num_devices') > 3, 1).otherwise(0)).drop('num_devices').distinct()

ref_data_df_2.unpersist()  # Unpersist the DataFrame

Out[18]: DataFrame[household_id: bigint, device_id: string, cond_2: int, more_than_3_dev: int]

####Program-Data dataframe transformation

In [0]:
program_data_df_2.cache()

# Converting date to days of week
program_data_df_2 = program_data_df_2.withColumn('air_day', to_date(program_data_df_2["air_date"], 'yyyyMMdd'))
program_data_df_2 = program_data_df_2.withColumn('air_day', dayofweek(program_data_df_2["air_day"]))

#checking for shows that aired on the weekend
weekend_shows = program_data_df_2.select('prog_code', 'air_day', 'air_time').filter((((col('air_day') == 6) & (hour(col('air_time')) >= 18)) | (col('air_day') == 7) & (hour(col('air_time')) <= 19))).select('prog_code').distinct().collect()
weekend_shows = [row.prog_code for row in weekend_shows]

#splitting genres for convenience
program_data_df_2 = program_data_df_2.withColumn('genre', split(col('genre'), ','))

# flagging programs that answer condition 6
program_data_df_2 = program_data_df_2.withColumn('cond_6', when(((array_contains(col('genre'), 'Talk')) |
                                                            (array_contains(col('genre'), 'Politics')) |
                                                            (array_contains(col('genre'), 'News')) |
                                                            (array_contains(col('genre'), 'Community')) |
                                                            (array_contains(col('genre'), 'Crime'))) &
                                                           (col('duration') > 35), 1).otherwise(0)).drop('genre', 'duration').distinct()


program_data_df_2.unpersist()  # Unpersist the DataFrame

Out[19]: DataFrame[prog_code: string, air_date: string, air_time: string, air_day: int, cond_6: int]

####Combining dataframes and completing the conditions that require data from seperate dataframes

In [0]:
#combining demographic data with reference data
joined_demo_ref_df = ref_data_df_2.join(demographic_data_df_2, on='household_id', how='left')

demographic_data_df_2.unpersist() #unpersist demographic dataframe
joined_demo_ref_df.cache() #cache joined dataframe

#flagging shows that answer condition 5
joined_demo_ref_df = joined_demo_ref_df.withColumn('cond_5', when((col('lower_than_avg') == 1) &
                                                                 (col('more_than_3_dev') == 1), 1).otherwise(0)) \
                                      .select('device_id', 'cond_2', 'cond_3', 'cond_5', 'large_family')

#replacing null values with 0s
joined_demo_ref_df = joined_demo_ref_df.na.fill(0)

#combining ref_demo with viewing by device
joined_demo_ref_viewing_df = viewing_data_df_2.join(joined_demo_ref_df, on='device_id', how='left').drop('device_id').na.fill(0).distinct()

joined_demo_ref_df.unpersist() #unpersist joined
joined_demo_ref_viewing_df.cache()

#combining program data with the rest
combined_df = program_data_df_2.join(joined_demo_ref_viewing_df, on='prog_code', how='left').distinct()

joined_demo_ref_viewing_df.unpersist()                                         
combined_df.cache()

#flagging program code that were watched at least once by a large family
large_family_shows = combined_df.select('prog_code', 'large_family').filter(col('large_family') == 1).select('prog_code').distinct().collect()
large_family_shows = [row.prog_code for row in large_family_shows]

#adding datato the main dataframe
#combined_df = combined_df.withColumn('large_family_show', when(col('prog_code').isin(large_family_shows), 1).otherwise(0)).drop('large_family')
                                        
#flagging shows that aired on weekend and were watched by a large family
combined_df = combined_df.withColumn('cond_4', when((col('prog_code').isin(weekend_shows)) & (col('prog_code').isin(large_family_shows)), 1). \
                                     otherwise(0)).drop('air_date', 'air_time', 'air_day', 'large_family').fillna(0).distinct()

####Counting conditions

In [0]:
#counting programs that answer conditions
num_cond_1 = combined_df.filter(col('cond_1') == 1).select('prog_code').distinct().count()
num_cond_2 = combined_df.filter(col('cond_2') == 1).select('prog_code').distinct().count()
num_cond_3 = combined_df.filter(col('cond_3') == 1).select('prog_code').distinct().count()
num_cond_4 = combined_df.filter(col('cond_4') == 1).select('prog_code').distinct().count()
num_cond_5 = combined_df.filter(col('cond_5') == 1).select('prog_code').distinct().count()
num_cond_6 = combined_df.filter(col('cond_6') == 1).select('prog_code').distinct().count()

print("number of programs that answer condition 1: ", num_cond_1)
print("number of programs that answer condition 2: ", num_cond_2)
print("number of programs that answer condition 3: ", num_cond_3)
print("number of programs that answer condition 4: ", num_cond_4)
print("number of programs that answer condition 5: ", num_cond_5)
print("number of programs that answer condition 6: ", num_cond_6)

number of programs that answer condition 1:  9135
number of programs that answer condition 2:  235426
number of programs that answer condition 3:  180058
number of programs that answer condition 4:  66047
number of programs that answer condition 5:  255486
number of programs that answer condition 6:  33648


##1.2

##Question 1.2_1 <br>
<br>

###Another solutino to the problem:##

- start of by working with each scheme individually, performing as much of the necessary operations on the seperate tables.
- establishing clear logical connections between the different datasets and join them when necessary. <br>
<br>

###Pros & Cons##

**Pro 1:** if we combine all the data into one table early on, It will be very easy to work with because all the qualities are in one table, so you can access all the necessary properties of each record without alternating between different tables and joining. <br>
**Pro 2:** combining all the data into one big table means we will only use that table throughout the entire process, which means we can cache/persist it early on, and we won't have to worry about memory management.

**Con 1:** Joining all the tables when they still contain all the records (before filtering anything out) can be very heavy computationally. Addtionally, naturally this table will have alot more records, meaning with each operation such as filtering, we will need to go through all the records in the big table, even if using one of the sub-tables would have been sufficient. <br>
**Con 2:** Increased complexity: Joining all the data into one table requires mapping and integrating different data structures, formats, aliases and naming conventions. Needless to say this can be very complex when dealing with datasets that encompass a vast array of diverse properties.

##1.2_2

####finding all program codes of malicious programs

In [0]:
malicious_progs_df = combined_df.filter( \
    col('cond_1') + col('cond_2') + col('cond_3') \
    + col('cond_4') + col('cond_5') + col('cond_6') >= 4 \
    ).select('prog_code').distinct().orderBy('prog_code', ascending=True)

malicious_progs_df.cache() #caching malicious_progs dataframe

#saving a pandas dataframe for the csv
malicious_progs_pandas = malicious_progs_df.toPandas()

####Displaying all malicious programs

In [0]:
#displaying to save manually
display(malicious_progs_pandas)

prog_code
EP000009540005
EP000018936972
EP000018936974
EP000018936997
EP000018936999
EP000018937001
EP000018937003
EP000018937005
EP000018937032
EP000018937034


####Showing top 150 malicious programs

In [0]:
#printing the malicious programs
malicious_progs_df.show(150, truncate=False)

+--------------+
|prog_code     |
+--------------+
|EP000009540005|
|EP000018936972|
|EP000018936974|
|EP000018936997|
|EP000018936999|
|EP000018937001|
|EP000018937003|
|EP000018937005|
|EP000018937032|
|EP000018937034|
|EP000018937036|
|EP000018937038|
|EP000018937060|
|EP000018937062|
|EP000018937064|
|EP000018937088|
|EP000018937090|
|EP000018937092|
|EP000018937094|
|EP000018937116|
|EP000018937118|
|EP000018937120|
|EP000018937122|
|EP000018937124|
|EP000018937152|
|EP000018937154|
|EP000018937156|
|EP000018937158|
|EP000018937180|
|EP000018937182|
|EP000018937184|
|EP000018937186|
|EP000018937188|
|EP000018937215|
|EP000018937217|
|EP000018937219|
|EP000018937221|
|EP000018937243|
|EP000018937245|
|EP000018937247|
|EP000018937249|
|EP000018937271|
|EP000018937273|
|EP000018937275|
|EP000018937277|
|EP000018937279|
|EP000018937306|
|EP000018937308|
|EP000018937310|
|EP000018937312|
|EP000018937336|
|EP000037100945|
|EP000041755641|
|EP000041755644|
|EP000041755654|
|EP00004175565