# Distrubuted Data Management Project

## Part 1

#### Assignment:
The assignment as a whole includes a process of loading data, analyzing and understanding them, drawing
conclusions, properly designing a distributed database and implementing the design using spark. We will
break down the task into several parts

In [None]:
%fs
ls "/mnt/ddscoursedatastorage/fwm-stb-data"

path,name,size,modificationTime
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/Daily program data/,Daily program data/,0,0
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/demographic/,demographic/,0,0
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/noam_testing_write/,noam_testing_write/,0,1686123319000
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/refxml/,refxml/,0,0
dbfs:/mnt/ddscoursedatastorage/fwm-stb-data/rpt-prog-data/,rpt-prog-data/,0,0


### Spam Detection:
Backstory: “Evil Media Association”, the largest media association, has inserted malicious and false information to our files in fear that we will surpass it.<br>
The malicious records are from the “Daily Program Data”. Use spark to compute the 6 following conditions for each record.<br> If 4 or more of the conditions apply to a record then we say it is a malicious record:<br>
1. The prog code was viewed by a device with less than 5 average daily events. <br>
2. The prog code was watched by a device from a DMA name that contains the letter [‘z’] (caseinsensitive).<br>
3.The program was watched by a family with less than 3 adults and their net worth is higher than 8 (both exclusive).<br>
4. The program code was aired (at least once) between Friday at 6PM and Saturday at 7PM (both inclusive) and there was at-least one household who watched the program with size higher than or equal to 8 (inclusive). <br>
5. The prog code was watched (at least once) by a device from an household with more than 3 devices (exclusive) and the income of that household is lower than the average household income in the data.<br>
6. The program contains at least one of the genres [‘Talk’, ‘Politics’, ‘News’, ‘Community’, ‘Crime’] and has a duration of more than 35 minutes (exclusive).



#### Loading the Data:

In [None]:
from pyspark.sql.types import *

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("my project 1").getOrCreate()
sc = spark.sparkContext

# Read a CSV into a dataframe
# There is a smarter version, that will first check if there is a Parquet file and use it
def load_PD_file(filename_or_dir, schema) :
    dataPath = "/mnt/ddscoursedatastorage/fwm-stb-data/" + filename_or_dir
    df = spark.read.format("csv")\
      .option("header","false")\
      .option("delimiter", "|")\
      .schema(schema)\
      .load(dataPath)
    return df

In [None]:
df1 = spark.read.csv("/mnt/ddscoursedatastorage/dds-students/test.csv")

In [None]:
ref_data = spark.read.parquet('/ref_data_raw').withColumnRenamed("_device-id","device_id")\
                                                .withColumnRenamed("_dma","dma")\
                                                .withColumnRenamed("_dma-code","dma_code")\
                                                .withColumnRenamed("_household-id","household_id")\
                                                .withColumnRenamed("_household-type","household_type")\
                                                .withColumnRenamed("_system-type","system_type")\
                                                .withColumnRenamed("_zipcode","zipcode")
ref_data_count = ref_data.count()

In [None]:
daily_prog_schema =  StructType([StructField('prog_code',StringType()),
                     StructField('title',StringType()),
                     StructField('genre',StringType()),
                     StructField('air_date',StringType()),
                     StructField('air_time',StringType()),
                     StructField('Duration',FloatType())
                                       ])
daily_prog_data = load_PD_file("Daily program data/" , daily_prog_schema  )

daily_prog_data.count()

Out[5]: 13194849

In [None]:
viewing_data = spark.read.parquet('/sample_viewing_2_5percent')

In [None]:
demographic_schema =  StructType([StructField('household_id',StringType()),
                      StructField('household_size',IntegerType()),
                      StructField('num_adults',IntegerType()),
                      StructField('num_generations',IntegerType()),
                      StructField('adult_range',StringType()),
                      StructField('marital_status',StringType()),
                      StructField('race_code',StringType()),
                      StructField('presence_children',StringType()),
                      StructField('num_children',IntegerType()),
                      StructField('age_children',StringType()), #format like range - 'bitwise'
                      StructField('age_range_children',StringType()),
                      StructField('dwelling_type',StringType()),
                      StructField('home_owner_status',StringType()),
                      StructField('length_residence',IntegerType()),
                      StructField('home_market_value',StringType()),
                      StructField('num_vehicles',IntegerType()),
                      StructField('vehicle_make',StringType()),
                      StructField('vehicle_model',StringType()),
                      StructField('vehicle_year',IntegerType()),
                      StructField('net_worth',IntegerType()),
                      StructField('income',StringType()),
                      StructField('gender_individual',StringType()),
                      StructField('age_individual',IntegerType()),
                      StructField('education_highest',StringType()),
                      StructField('occupation_highest',StringType()),
                      StructField('education_1',StringType()),
                      StructField('occupation_1',StringType()),
                      StructField('age_2',IntegerType()),
                      StructField('education_2',StringType()),
                      StructField('occupation_2',StringType()),
                      StructField('age_3',IntegerType()),
                      StructField('education_3',StringType()),
                      StructField('occupation_3',StringType()),
                      StructField('age_4',IntegerType()),
                      StructField('education_4',StringType()),
                      StructField('occupation_4',StringType()),
                      StructField('age_5',IntegerType()),
                      StructField('education_5',StringType()),
                      StructField('occupation_5',StringType()),
                      StructField('polit_party_regist',StringType()),
                      StructField('polit_party_input',StringType()),
                      StructField('household_clusters',StringType()),
                      StructField('insurance_groups',StringType()),
                      StructField('financial_groups',StringType()),
                      StructField('green_living',StringType())
                                       ])

demographic_data = load_PD_file("demographic/" , demographic_schema  )  

display(demographic_data.limit(1))


household_id,household_size,num_adults,num_generations,adult_range,marital_status,race_code,presence_children,num_children,age_children,age_range_children,dwelling_type,home_owner_status,length_residence,home_market_value,num_vehicles,vehicle_make,vehicle_model,vehicle_year,net_worth,income,gender_individual,age_individual,education_highest,occupation_highest,education_1,occupation_1,age_2,education_2,occupation_2,age_3,education_3,occupation_3,age_4,education_4,occupation_4,age_5,education_5,occupation_5,polit_party_regist,polit_party_input,household_clusters,insurance_groups,financial_groups,green_living
15,2,2,1,100000000,S,B,,,0,0,S,O,5,E,,,,,6,4,M,60,4,,,,,,,,,,,,,,,,,D,443,02C3,08C3,


In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import*
from pyspark.sql.functions import col, lower, when, dayofweek , to_date, lit, avg, from_unixtime, unix_timestamp, array_contains, split, first, countDistinct
import os

#### Preprocessing - first approach
Group the 4 databases to a single database.

In [None]:
ref_data_naive = ref_data.select("device_id", "dma","household_id")
daily_prog_data_naive = daily_prog_data.drop("title")
viewing_data_naive = viewing_data.drop("station_num", "mso_code")
demographic_data_naive = demographic_data.select("household_id", "income", "num_adults", "net_worth", "household_size")

#adding number of average daily enents to each device
viewing_data_naive.createOrReplaceTempView("viewing_data_naive")
viewing_data_naive_Davg = spark.sql("""SELECT device_id , 1.0*count(*)/count(distinct event_date) AS avg_num FROM viewing_data_naive group by device_id""")
viewing_data_naive_update = viewing_data_naive.join(viewing_data_naive_Davg, how="inner", on="device_id")

#adding a column of the day of the week
daily_prog_data_naive_days = daily_prog_data_naive.withColumn("air_date", daily_prog_data_naive["air_date"].cast(StringType()))
daily_prog_data_naive_update = daily_prog_data_naive_days.withColumn('day_of_week', dayofweek(from_unixtime(unix_timestamp('air_date', 'yyyyMMdd'))))\
            .drop("air_date")

#updating income column, according to the demand
demographic_data_naive_update = demographic_data_naive.withColumn("update_income", \
                                when((demographic_data_naive.income=='A'), lit(10))\
                                .when((demographic_data_naive.income=='B'), lit(11))\
                                .when((demographic_data_naive.income=='C'), lit(12))\
                                .when((demographic_data_naive.income=='D'), lit(13))\
                                .otherwise(lit(demographic_data_naive.income.cast(IntegerType()))))
avg_income_naive = demographic_data_naive_update.where(demographic_data_naive_update.income != "null").select(avg("update_income")).first()[0]

#adding column of the number of deivces in each household
ref_data_naive_totalD = ref_data_naive.groupBy("household_id").agg(countDistinct("device_id")).withColumnRenamed("count(device_id)", "total_devices").select("household_id", "total_devices")
ref_data_naive_update = ref_data_naive_totalD.join(ref_data_naive, how="inner", on="household_id")

#join all the schemata into one dataframe
all_data = ref_data_naive_update.join(demographic_data_naive_update, how="inner", on="household_id")\
    .join(viewing_data_naive_update, how="inner", on="device_id")\
    .join(daily_prog_data_naive_update, how="inner", on="prog_code")


#### Preprocessing - second approach
For each condition, we found the program IDs that met the specific condition. After performing this process for all 6 conditions, we found the program IDs that met 4 or more conditions (= spam).

In [None]:
#1
viewing_data_DID_EDATE = viewing_data.select("device_id","event_date")
viewing_data_PC_DID = viewing_data.select("prog_code", "device_id").cache()#1 and 2 and 4

#2
ref_data_DID_DMA = ref_data.select("device_id", "dma")

#3
dd_f = demographic_data.select("household_id", "num_adults", "net_worth").na.drop()
ref_data_HID_DID = ref_data.select("household_id", "device_id") #AND 4, 5

#4
dd_HID_HS = demographic_data.select("household_id", "household_size").na.drop()
daily_prog_data_DAYS = daily_prog_data.select("prog_code", "air_date", "air_time")\
       .withColumn("air_date", daily_prog_data["air_date"].cast(StringType()))
daily_prog_data_DAYS = daily_prog_data_DAYS.withColumn('day_of_week', dayofweek(from_unixtime(unix_timestamp('air_date', 'yyyyMMdd'))))\
            .drop("air_date")

#5
dd_HID_IN = demographic_data.select("household_id", "income").where(demographic_data.income!="null")

dd_HID_IN = dd_HID_IN.withColumn("update_income", \
                                when((dd_HID_IN.income=='A'), lit(10))\
                                .when((dd_HID_IN.income=='B'), lit(11))\
                                .when((dd_HID_IN.income=='C'), lit(12))\
                                .when((dd_HID_IN.income=='D'), lit(13))\
                                .otherwise(lit(dd_HID_IN.income.cast(IntegerType())))).na.drop()
avg_income = dd_HID_IN.select(avg("update_income")).first()[0]

ref_data_totalD = ref_data_HID_DID.groupBy("household_id").agg(countDistinct("device_id")).withColumnRenamed("count(device_id)", "total")

#6
daily_prog_data_f = daily_prog_data.select("prog_code", "genre", "Duration")

Finding the prog_codes that meets each condition:

In [None]:
#condition 1
viewing_data_DID_EDATE.createOrReplaceTempView("viewing_data_DID_EDATE")
devices_with_less_then_5 = spark.sql("""SELECT device_id FROM viewing_data_DID_EDATE group by device_id having 1.0*count(*)/count(distinct event_date)<5 """)

condition1 = devices_with_less_then_5.join(viewing_data_PC_DID, how="inner", on="device_id").select("prog_code").distinct()
condition1.cache()
condition1.count()

Out[11]: 9138

In [None]:
#condition 2
devices_condition2 = ref_data_DID_DMA.filter(lower(ref_data_DID_DMA.dma).like('%z%')).select("device_id")
condition2 = devices_condition2.join(viewing_data_PC_DID, how="inner", on="device_id").select("prog_code").distinct()
condition2.cache()
condition2.count()

Out[12]: 235428

In [None]:
#condition 3
condition3_households = dd_f.filter((dd_f.num_adults<3) & (dd_f.net_worth>8)).select("household_id")
condition3_devices = ref_data_HID_DID.join(condition3_households, how="inner", on="household_id").select("device_id").select("device_id")
condition3 = condition3_devices.join(viewing_data_PC_DID, how="inner", on="device_id").select("prog_code").distinct()
condition3.cache()
condition3.count()

Out[13]: 180061

In [None]:
# condition 4 
condition4_households = dd_HID_HS.filter(dd_HID_HS.household_size>=8).select("household_id")
condition4_divices= ref_data_HID_DID.join(condition4_households, how="inner", on="household_id").select("device_id")
condition4_prog_code = condition4_divices.join(viewing_data_PC_DID, how="inner", on="device_id").select("prog_code").distinct()


prog_code_relevant_days = daily_prog_data_DAYS.filter(((daily_prog_data_DAYS.day_of_week == 6)& (daily_prog_data_DAYS.air_time >=180000))\
                                         |((daily_prog_data_DAYS.day_of_week == 7)&(daily_prog_data_DAYS.air_time <= 190000)))\
                        .select("prog_code").distinct()

condition4= prog_code_relevant_days.join(condition4_prog_code, how="inner", on="prog_code").select("prog_code").distinct()
condition4.cache()
condition4.count()

Out[14]: 67646

In [None]:
#condition 5
below_avg_HID = dd_HID_IN.filter(dd_HID_IN.update_income < avg_income).select("household_id")#no need to put distinct

at_least3_devices= ref_data_totalD.filter(ref_data_totalD.total > 3).select("household_id")
at_least3_devices.cache()

condition5_households = at_least3_devices.join(below_avg_HID , how="inner", on="household_id")
condition5_households.cache()

condition5_devices = condition5_households.join(ref_data_HID_DID, how="inner", on="household_id").select("device_id").distinct()
condition5_devices.cache()
condition5 = condition5_devices.join(viewing_data_PC_DID, how="inner", on="device_id").select("prog_code").distinct()
condition5.cache()
condition5.count()

Out[15]: 255489

In [None]:
#condition 6

prohibited_genres = ['Talk', 'Politics', 'News', 'Community', 'Crime']
daily_prog_data_f = daily_prog_data_f.filter(daily_prog_data_f.Duration>35)
condition6 = daily_prog_data_f.filter(
    array_contains(split(daily_prog_data_f.genre, ','), prohibited_genres[0])|
    array_contains(split(daily_prog_data_f.genre, ','), prohibited_genres[1])|
    array_contains(split(daily_prog_data_f.genre, ','), prohibited_genres[2])|
    array_contains(split(daily_prog_data_f.genre, ','), prohibited_genres[3])|
    array_contains(split(daily_prog_data_f.genre, ','), prohibited_genres[4])).select("prog_code").distinct()
condition6.cache()
condition6.count()

Out[16]: 33648

"all_malicious_prog" - a dataframe of all the prog_codes that apply 4 or more conditions

In [None]:
all_malicious_prog = condition1.unionAll(condition2).unionAll(condition3).unionAll(condition4).unionAll(condition5).unionAll(condition6)
#malicious
all_malicious_prog.cache()
all_malicious_prog = all_malicious_prog.groupBy("prog_code").count().withColumnRenamed("count", "total_conditions")
all_malicious_prog = all_malicious_prog.filter(all_malicious_prog.total_conditions >= 4).select("prog_code")

print(f"Number of relevant entries for each condition:")
print(f"condition 1: {condition1.count()}, condition 2: {condition2.count()}, condition 3: {condition3.count()}, condition 4: {condition4.count()}, condition 5: {condition5.count()}, condition 6: {condition6.count()}")

all_malicious_prog = all_malicious_prog.orderBy("prog_code")
display(all_malicious_prog.limit(150))

Number of relevant entries for each condition:
condition 1: 9138, condition 2: 235428, condition 3: 180061, condition 4: 67646, condition 5: 255489, condition 6: 33648


prog_code
DVRPGMKEY
EP000000211576
EP000000211614
EP000000211639
EP000000211645
EP000000211646
EP000000211647
EP000000211648
EP000000211650
EP000000211654


In [None]:
display(all_malicious_prog) #gives the option to download all of the malicious prog_codes

prog_code
DVRPGMKEY
EP000000211576
EP000000211614
EP000000211639
EP000000211645
EP000000211646
EP000000211647
EP000000211648
EP000000211650
EP000000211654
