In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [2]:
# Read in AP Data
df_ap = spark.read.csv('ap_data/18676_data_sample2.csv', header=True, inferSchema=True, sep=",", nullValue="")

In [3]:
from pyspark.sql.functions import isnan, when, count, col, trim
# Fix Column Name Formatting
df_ap = df_ap.toDF(*(c.replace('.', '') for c in df_ap.columns)) 
# Get null value count for each column
df_ap.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ap.columns]).toPandas() 

Unnamed: 0,customer,id,erm__serial_number,AP Client Count Average,AP Client Count Rate Average,AP Client Count Std Dev,AP Client Count Total,AP Client Count Count,AP Client Count Max,5 GHz Client Count Average,...,Unique Clients,Average TX Kbps,Average RX Kbps,Average Total Kbps,Average Total RSSI,Average Total SNR,Average Total MCS,Average Total Bandwidth,Average Total LinkQuality,Average Total TxReties
0,0,0,0,0,0,0,0,0,126,126,...,99999,99999,99999,99999,99999,99999,99999,99999,99999,99999


In [4]:
import pyspark.sql.functions as F

# Drop Columns with all null values
def drop_null_columns(df):
    
    _df_length = df.count()
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v >= _df_length]
    df = df.drop(*to_drop)
    
    return df

df_ap = drop_null_columns(df_ap)

In [5]:
# Read in Wifi-Data
df_wifi = spark.read.csv('Wifi_CIR/WiFi_CIR_February_2021.csv', header=True, inferSchema=True, sep=",", nanValue='', nullValue='')

# Replace 'No' with None
df_wifi = df_wifi.na.replace('No', None)

In [6]:
# Update serial number column name before merging both data frames
df_ap = df_ap.withColumnRenamed('erm__serial_number', 'SERIAL_NO')

In [7]:
max_columns = [c for c in df_ap.columns if 'Max' in c]
average_columns = [c for c in df_ap.columns if 'Average' in c]
rate_average_columns = [c for c in df_ap.columns if 'Rate Average' in c]
std_columns = [c for c in df_ap.columns if 'Std Dev' in c]
total_columns = [c for c in df_ap.columns if 'Total' in c]
count_columns = [c for c in df_ap.columns if 'Count' in c]
count_average = [c for c in df_ap.columns if 'Count Average' in c]

In [14]:
import pyspark.sql.functions as F

# Drop columns from wifi data
columns_to_drop = df_wifi.columns + ['customer', 'id'] + total_columns + count_columns + rate_average_columns 
columns_to_drop.remove('SERIAL_NO')
[columns_to_drop.remove(c) for c in count_average]

# Merge dataframes based on 'SERIAL_NO' column and add 'label' column where 1 represents a call-in
df = df_ap.join(df_wifi.withColumn('label', F.lit(1)), 'SERIAL_NO', 'left').fillna(0).drop(*columns_to_drop)
df.groupBy('label').count().show() 

+-----+-----+
|label|count|
+-----+-----+
|    1|  519|
|    0|99563|
+-----+-----+



In [36]:
maxes = [df.agg({c: "max"}).collect()[0][0] for c in df.columns[1:-1]]

In [37]:
mins = [df.agg({c: "min"}).collect()[0][0] for c in df.columns[1:-1]]

In [38]:
averages = [df.agg({c: "avg"}).collect()[0][0] for c in df.columns[1:-1]]

In [57]:
import csv
fields = ['feauture', 'min', 'max', 'avg']
rows = [mins, maxes, averages]

filename = "AP_WiFi_Features.csv"
    
# writing to csv file 
with open(filename, 'w') as csvfile: 
    # creating a csv writer object 
    csvwriter = csv.writer(csvfile) 

    # writing the fields 
    csvwriter.writerow(fields) 

    for count in range(len(maxes)):
        #writing the data rows 
        csvwriter.writerow([df.columns[1:-1][count], rows[0][count], rows[1][count], rows[2][count]])