In [1]:
import pandas as pd
import json
import numpy
from pandas.io.json import json_normalize
from scipy.spatial.distance import cosine
import csv
# from pyspark import SparkContext
# from pyspark import SparkConf
# from pyspark.sql import SQLContext
from pyspark.sql.functions import udf #user defined function
from pyspark.sql.functions import lit, col
from pyspark.sql.types import *
from pyspark.sql import HiveContext
import ast

In [2]:
# Not needed for Spark on EC2, only for local setup
# sc = SparkContext("local", "Region Network")

In [3]:
# loading the raw data
# df = pd.read_csv('../../data/CDR/hash/sample.csv') 
# df.columns = ['index','time','source','dest','call']



time          source dest call                
1383297600000 1      1    1.445982643495844E-4
1383300000000 1      1    2.893335821627406...
1383306000000 1      1    2.170344499879484...
1383306600000 1      1    6.92190810703531E-5 
1383308400000 1      1    7.22991321747922E-5 
1383309000000 1      1    2.107372943154984E-4
1383315000000 1      1    7.395236931786679E-5
1383317400000 1      1    6.92190810703531E-5 
1383331200000 1      1    6.92190810703531E-5 
1383339600000 1      1    7.22991321747922E-5 
1383342000000 1      1    7.551623674280319E-5
1383342600000 1      1    6.92190810703531E-5 
1383265800000 1      10   2.814243229598746E-5
1383288000000 1      10   3.907339743793248E-5
1383296400000 1      10   1.028247752913052E-4
1383297000000 1      10   1.309672075872926...
1383298800000 1      10   1.551390040413683E-4
1383299400000 1      10   1.092125409370522...
1383305400000 1      10   1.569513613199081...
1383307800000 1      10   1.040236849748308...


In [5]:
# loading the raw data
# df = pd.read_csv('../../data/CDR/hash/sample.csv') 
# df.columns = ['index','time','source','dest','call']
# sqlCtx = SQLContext(sc)
# cs_df = sqlCtx.createDataFrame(df)

In [6]:

# loading the cell-proportion data
prop_table = pd.read_csv('../../data/CDR/hash/cell_intersect.csv', header = None) 
prop_table.columns = ['cell', 'proportions']
prop_table.index = prop_table.cell
prop_table.sort_values(['cell'], inplace=True)

In [None]:


def get_cells_per_region(table, region_id):
    ids = table.iloc[region_id].proportions
    ids = ast.literal_eval(table.get_value(region_id, "proportions"))
    return ids.keys()

def get_call_data(source, dest, cs_df, table):        
    source_dict = get_cells_per_region(table, source)
    dest_dict = get_cells_per_region(table, dest)
    query = "SELECT * from cs_df WHERE "
    
    i = 1    
    
    for skey in source_dict:                  
        query += "source = " + str(skey) 
        if len(source_dict) > i:
            query += " OR "
        i += 1
    
    subset = sqlCtx.sql(query)
    subset.registerTempTable("subset")
    
    i=1
    query = "SELECT * from subset WHERE "
            
    for dkey in dest_dict:  
        query += "dest = " + str(dkey)
        if len(dest_dict) > i:
            query += " OR "
        i += 1
        
    subset2 = sqlCtx.sql(query)
            
    return subset2

def calculate_actual_call(s_cell, d_cell, call, s_region, d_region):
    """
        Create another column on the subset DataFrame that is proportional to the regions.
    """
    source_prop = ast.literal_eval(prop_table.get_value(int(s_cell), "proportions"))
    dest_prop = ast.literal_eval(prop_table.get_value(int(d_cell), "proportions"))

    try:
        final = source_prop[str(s_region)] * dest_prop[str(d_region)] * float(call)
    except:
        final = 0
    
    return float(final)

In [None]:
if __name__ == "__main__":
    sqlCtx = SQLContext(sc)
    rdd = sc.textFile("hdfs://ip-10-46-133-118.us-west-2.compute.internal:9000/user/root/mi-to-mi/*").map(lambda row:row.split('\t'))
    cs_df = sqlCtx.createDataFrame(rdd, ['time','source','dest','call'])
    
    cs_df.registerTempTable("cs_df")
    sqlCtx.cacheTable("cs_df")

    # loading the region-cell data
    table = pd.read_csv('../../data/CDR/hash/intersect.csv', header = None) 
    table.columns = ['region', 'proportions']
    table.index = table.region
    table.sort_values(['region'], inplace=True)
    
    schema = StructType([
                StructField("time", IntegerType(), nullable=False),
                StructField("adjusted_call", FloatType(), nullable=False),
                StructField("source_region", FloatType(), nullable=False),
                StructField("dest_region", FloatType(), nullable=False)            
        ])

    region_network = sqlCtx.createDataFrame([], schema)
    udf_calls = udf(calculate_actual_call, FloatType())

    for s in range(1,2):
        for d in range(1,81):
            # get a subset of records for the source and dest
            subdf = get_call_data(s, d, cs_df, table)        
            subdf = subdf.withColumn("source_region", lit(s))
            subdf = subdf.withColumn("dest_region", lit(d))
            print (s, d)
            # create a column with adjusted call values
            try:
                subdf = subdf.select("time","source", "dest", "call", "source_region", "dest_region", udf_calls("source", "dest", "call", "source_region", "dest_region").alias("adjusted_call"))
                # subdf = subdf.withColumn("adjusted_call", udf_calls("source", "dest", "call", "source_region", "dest_region"))
            except:
                continue

            # do aggregation for 
            subdf = subdf.groupBy("time").agg({                
                    "source_region": "max",
                    "dest_region": "max",
                    "adjusted_call": "sum"
                })
            region_network = region_network.unionAll(subdf)

        region_network.toPandas().to_csv('../../data/CDR/generated/region-network_'+ str(s) +'.csv')

(1, 1)
(1, 2)
(1, 3)
(1, 4)
(1, 5)
(1, 6)
(1, 7)
(1, 8)
(1, 9)
(1, 10)
(1, 11)
(1, 12)
(1, 13)
(1, 14)
(1, 15)
(1, 16)
(1, 17)
(1, 18)
(1, 19)
(1, 20)
(1, 21)
(1, 22)
(1, 23)
(1, 24)
(1, 25)
(1, 26)
(1, 27)
(1, 28)
(1, 29)
(1, 30)
(1, 31)
(1, 32)
(1, 33)
(1, 34)
(1, 35)
(1, 36)
(1, 37)
(1, 38)
(1, 39)
(1, 40)
(1, 41)
(1, 42)
(1, 43)
(1, 44)
(1, 45)
(1, 46)
(1, 47)
(1, 48)
(1, 49)
(1, 50)
(1, 51)
(1, 52)
(1, 53)
(1, 54)
(1, 55)
(1, 56)
(1, 57)
(1, 58)
(1, 59)
(1, 60)
(1, 61)
(1, 62)
(1, 63)
(1, 64)
(1, 65)
(1, 66)
(1, 67)
(1, 68)
(1, 69)
(1, 70)
(1, 71)
(1, 72)
(1, 73)
(1, 74)
(1, 75)
(1, 76)
(1, 77)
(1, 78)
(1, 79)
(1, 80)
