# Section 1 Initialization 
## Section 1.1 Copy results of IMC2020_AntennaIMDAssignment to HDFS

In [None]:
%%bash
hdfs dfs -copyFromLocal datasets/Telefonica_Antenna/XG/telefonica_antenna_liverpool_imd_pd.csv QoE/Liverpool/
hdfs dfs -copyFromLocal datasets/Telefonica_Antenna/XG/telefonica_antenna_london_imd_pd.csv QoE/London/
hdfs dfs -copyFromLocal datasets/Telefonica_Antenna/XG/telefonica_antenna_birmingham_imd_pd.csv QoE/Birmingham/

## Section 1.2 PySpark and Python modules

In [1]:
import os
os.environ["SPARK_HOME"] = '/usr/local/spark/spark-1.6.2-bin-hadoop2.6'
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[*] --deploy-mode client --packages com.databricks:spark-csv_2.11:1.3.0 pyspark-shell"

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark import sql
from pyspark.sql import HiveContext, Window, SQLContext
from pyspark.sql.types import DoubleType, StructType, StructField, StringType
from pyspark.sql.functions import col, lit, count, sum, avg, max, array

print('starting')
conf = SparkConf().setAppName('HomeDetection')
sc = SparkContext(conf=conf)

# we need HiveContext to use Hive builtin functions:
# hive builtin functions : https://support.treasuredata.com/hc/en-us/articles/360001457367-Hive-Built-in-Aggregate-Functions
sqlContext = HiveContext(sc)

print('finished')

starting
finished


## Section 1.2 Define datasets we are going to use

In [2]:
NightHours = ['/hour=00-04/part-*', '/hour=04-08/part-*']

data_dirs = ['QoE/Liverpool/Liverpool_Jan_2020/',
             'QoE/London/London_Jan_2020/',
             'QoE/Birmingham/Birmingham_Jan_2020/']

output_files = ['HomeAntenna_Liverpool_Jan_2020.csv',
               'HomeAntenna_London_Jan_2020.csv',
               'HomeAntenna_Birmingham_Jan_2020.csv']

antenna_info_files = ['QoE/Liverpool/telefonica_antenna_liverpool_imd_pd.csv',
                      'QoE/London/telefonica_antenna_london_imd_pd.csv',
                      'QoE/Birmingham/telefonica_antenna_birmingham_imd_pd.csv']

days = ['01', '02', '03']

schema = StructType([StructField('device_id', StringType(), True),
                     StructField('antenna_id', StringType(), True),
                     StructField('time_spent', DoubleType(), True)])

# Section 2 Home antenna detection

1.     iterate over days 
2.     split each row on "tab" 
3.     create data frames from two 4hour night intervals  
3.1    dataframes have rows where each rows begins with device_id, gyration, 2 mistery values and [antena_id(lkey), time_spent]  pairs  
3.2    each device_id has multiple rows 
4.     unite both time intervals, sum times for same antennas, keep the antenna with max time_spent 
5.     append dataframes from each day to final dataframe 
6.     add antenna coordinates to each antenna from antenna_coordinates dataset  
7.     identify most common antenna_id for each ['device_id','geometry'] pair (save it to temp)
8.     filter device_ids that connected to same coordinates at least 13x in the month -> final dataset
9.     add most common antenna_id to each ['device_id', 'geometry'] pair in final dataset

In [14]:
for data_dir,output_file,antenna_info_file in zip(data_dirs,output_files,antenna_info_files):
    NightHours_00040008_final_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
    for day in days:
        NightHours_0004 = sc.textFile(data_dir + day + NightHours[0]).map(lambda x: x.split('\t'))
        NightHours_0008 = sc.textFile(data_dir + day + NightHours[1]).map(lambda x: x.split('\t'))
        NightHours_0004_df = NightHours_0004.filter(lambda x: float(x[1]) <= 2000).flatMap(lambda x: [(x[0], x[i], x[i+1]) for i in range(5,len(x),2)])\
                                      .toDF(('device_id', 'antenna_id', 'time_spent'))
        
        NightHours_0008_df = NightHours_0008.filter(lambda x: float(x[1]) <= 2000).flatMap(lambda x: [(x[0], x[i], x[i+1]) for i in range(5,len(x),2)])\
                                      .toDF(('device_id', 'antenna_id', 'time_spent'))
        
        w = Window.partitionBy('device_id')
        NightHours_00040008_df = NightHours_0004_df.unionAll(NightHours_0008_df)
        NightHours_00040008_df = NightHours_00040008_df.groupby(['device_id', 'antenna_id']).agg(sum('time_spent').alias('time_spent'))\
                                      .withColumn('max_time_spent', max('time_spent').over(w))\
                                      .where(col('time_spent') == col('max_time_spent'))\
                                      .drop('max_time_spent')
        
        NightHours_00040008_final_df = NightHours_00040008_final_df.unionAll(NightHours_00040008_df)
    
    antenna_info = sqlContext.read.format('com.databricks.spark.csv').option('header', 'true').option('inferSchema', 'true').load(antenna_info_file)
    antenna_info = antenna_info.drop('IMDDecil')
    antenna_info = antenna_info.drop('generation')
    antenna_info = antenna_info.drop('geometry_voronoi')
    
    NightHours_00040008_final_df = NightHours_00040008_final_df.join(antenna_info,[NightHours_00040008_final_df['antenna_id']==antenna_info['lkey']], 'left').drop('lkey')
    
    
    # identification of most common antenna_id in for ['device_id','geometry'] pairs, based on solution from :
    # https://stackoverflow.com/questions/45634725/pyspark-aggregate-on-the-most-frequent-value-in-a-column
    temp = NightHours_00040008_final_df.groupby('device_id','geometry','antenna_id').count()\
                                       .withColumn('count_antenna_id', array('count', 'antenna_id'))\
                                       .groupby('device_id','geometry')\
                                       .agg(max('count_antenna_id').getItem(1).alias('antenna_id'))
    temp = temp.withColumnRenamed('device_id', 'device_idd')
    temp = temp.withColumnRenamed('geometry', 'geometryy')
    
    NightHours_00040008_final_df = NightHours_00040008_final_df.groupby(['device_id', 'geometry']).agg(count('geometry').alias('count_geometry'))\
                                                               .filter(col('count_geometry')>13)

    NightHours_00040008_final_df = NightHours_00040008_final_df.groupby(['device_id', 'geometry'])\
                                                               .agg(max('count_geometry').alias('max_count_geometry'))\
                                                               .drop('max_count_geometry')
        
    NightHours_00040008_final_df = NightHours_00040008_final_df.join(temp, [NightHours_00040008_final_df['geometry']==temp['geometryy'], NightHours_00040008_final_df['device_id']==temp['device_idd']],'left')
    NightHours_00040008_final_df = NightHours_00040008_final_df.drop('device_idd')
    NightHours_00040008_final_df = NightHours_00040008_final_df.drop('geometryy')

    # toPandas() for testing
    #NightHours_00040008_final_df.toPandas().to_csv(output_file, index=False)
    #NightHours_00040008_final_df.show(n=10)
    
    NightHours_00040008_final_df.coalesce(1).write.mode('append').format('com.databricks.spark.csv').option('header', 'true').save(output_file)

+--------------------+--------------------+-----------+
|           device_id|            geometry| antenna_id|
+--------------------+--------------------+-----------+
|KIQXVF6+HxfnllhFR...|POINT (-196320.33...|    130-649|
|QlQfCsgA9qPRiEj3M...|POINT (-196320.33...|    130-649|
|E2gk2nlCd8X5RRZzo...|POINT (-196406.78...|    120-762|
|FwK1+E7LGq3y0oVOj...|POINT (-196406.78...|    120-762|
|qE1hJ+TbmbBq7p2Xu...|POINT (-196406.78...|    120-762|
|LIKHcG2xA0wjQ8gfS...|POINT (-197214.56...|21218-11267|
|WSi5v+Ywgxs34BoKt...|POINT (-197214.56...|   110-3716|
|NdKSU96AdRJX2aNlQ...|POINT (-197525.12...|    120-429|
|PdVD9cOkvhd10vsS2...|POINT (-197525.12...|    120-429|
|IGLN9FJh+/dPYTUEU...|POINT (-197688.24...|    130-650|
+--------------------+--------------------+-----------+
only showing top 10 rows



# TEST OF "MOST COMMON ANTENNA IDENTIFICATION"

In [9]:
schema_test = StructType([StructField('device_id', StringType(), True),
                          StructField('antenna_id', StringType(), True),
                          StructField('geometry', StringType(), True)])
l = [['a','y','xx'],['a','y','xx'],['a','x','xx'],['a','x','xx'],['a','x','xx']]
rdd = sc.parallelize(l)
test = sqlContext.createDataFrame(rdd, schema_test)
test.show()

+---------+----------+--------+
|device_id|antenna_id|geometry|
+---------+----------+--------+
|        a|         y|      xx|
|        a|         y|      xx|
|        a|         x|      xx|
|        a|         x|      xx|
|        a|         x|      xx|
+---------+----------+--------+



In [11]:
aggregrated_table = test.groupby('device_id','geometry','antenna_id').count()\
.withColumn('count_antenna_id', array('count', 'antenna_id'))\
.groupby('device_id','geometry')\
.agg(max('count_antenna_id').getItem(1).alias('most_common_antenna_id'))
aggregrated_table.show()

+---------+--------+----------------------+
|device_id|geometry|most_common_antenna_id|
+---------+--------+----------------------+
|        a|      xx|                     x|
+---------+--------+----------------------+

