In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType
from pyspark.sql.functions import col
from pyspark.sql.functions import unix_timestamp, to_date, date_format
from pyspark.sql.window import Window 
from pyspark.sql.functions import rank 
import requests

In [None]:
import requests

url = 'https://wcd-de-labs-files.s3.amazonaws.com/key.json'

response = requests.get(url)
data = response.json()
ACCESS_KEY = data['key'][0]['access_key']
SECRET_ACCESS_KEY = data['key'][0]['secret_access_key']


In [None]:
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

  print ("Mounting", bucket_name)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

In [None]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "weclouddata/datasets/telecom/CDR", "cdr")

Mounting weclouddata/datasets/telecom/CDR
/mnt/cdr has been unmounted.
The bucket weclouddata/datasets/telecom/CDR was mounted to cdr 



In [None]:
%fs ls /mnt/cdr/cdr_by_grid_december/

path,name,size,modificationTime
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-01.txt,sms-call-internet-mi-2013-12-01.txt,298715145,1534382855000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-02.txt,sms-call-internet-mi-2013-12-02.txt,341919663,1534382855000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-03.txt,sms-call-internet-mi-2013-12-03.txt,353947238,1534382855000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-04.txt,sms-call-internet-mi-2013-12-04.txt,352032545,1534382855000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-05.txt,sms-call-internet-mi-2013-12-05.txt,353519447,1534382855000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-06.txt,sms-call-internet-mi-2013-12-06.txt,354028475,1534383133000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-07.txt,sms-call-internet-mi-2013-12-07.txt,307172220,1534383160000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-08.txt,sms-call-internet-mi-2013-12-08.txt,293663723,1534383165000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-09.txt,sms-call-internet-mi-2013-12-09.txt,338963272,1534383165000
dbfs:/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-10.txt,sms-call-internet-mi-2013-12-10.txt,349269739,1534383165000


In [None]:
cdrFiles = []

for i in range(1,6):
  file = f'/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-{i:02}.txt'
  cdrFiles.append(file)


cdrSchema = StructType([
    StructField("square-id", IntegerType(), True),
    StructField("time-interval", LongType(), True),
    StructField("country-code", IntegerType(), True),
    StructField("sms-in-activity", FloatType(), True),
    StructField("sms-out-activity", FloatType(), True),
    StructField("call-in-activity", FloatType(), True),
    StructField("call-out-activity", FloatType(), True),
    StructField("internet-activity", FloatType(), True)]
)

cdr = spark.read \
           .option('header', 'true') \
           .option('delimiter', '\t') \
           .schema(cdrSchema) \
           .csv(cdrFiles)

cdr.show(5)

+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+
|square-id|time-interval|country-code|sms-in-activity|sms-out-activity|call-in-activity|call-out-activity|internet-activity|
+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+
|        1|1385852400000|          46|           null|            null|            null|             null|      0.026137425|
|        1|1385853000000|          39|     0.16513683|      0.17639945|     0.030875085|      0.027300464|        13.330858|
|        1|1385853600000|           0|    0.029087774|     0.027300464|            null|             null|             null|
|        1|1385853600000|          39|     0.18645109|      0.13658783|     0.054600928|             null|        11.329553|
|        1|1385854200000|          39|     0.21965227|      0.38112897|      0.08252566|       0.13596356|       13.1661625|


In [None]:
from pyspark.sql.functions import col
columns = cdr.columns
cdr = cdr.select(*[col(c).alias(c.replace('-', '_')) for c in columns])
cdr.show(5)

+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+
|square_id|time_interval|country_code|sms_in_activity|sms_out_activity|call_in_activity|call_out_activity|internet_activity|
+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+
|        1|1385852400000|          46|           null|            null|            null|             null|      0.026137425|
|        1|1385853000000|          39|     0.16513683|      0.17639945|     0.030875085|      0.027300464|        13.330858|
|        1|1385853600000|           0|    0.029087774|     0.027300464|            null|             null|             null|
|        1|1385853600000|          39|     0.18645109|      0.13658783|     0.054600928|             null|        11.329553|
|        1|1385854200000|          39|     0.21965227|      0.38112897|      0.08252566|       0.13596356|       13.1661625|


In [None]:
cdr = cdr.select(col('*'), (col('sms_in_activity') / col('sms_out_activity')).alias('sms_ratio'))
cdr.show(5)

+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+------------------+
|square_id|time_interval|country_code|sms_in_activity|sms_out_activity|call_in_activity|call_out_activity|internet_activity|         sms_ratio|
+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+------------------+
|        1|1385852400000|          46|           null|            null|            null|             null|      0.026137425|              null|
|        1|1385853000000|          39|     0.16513683|      0.17639945|     0.030875085|      0.027300464|        13.330858|0.9361527194661042|
|        1|1385853600000|           0|    0.029087774|     0.027300464|            null|             null|             null|1.0654681351520536|
|        1|1385853600000|          39|     0.18645109|      0.13658783|     0.054600928|             null|        11.329553|1.3650637435

In [None]:
from pyspark.sql.functions import to_date, date_format
cdr = cdr.withColumn("date", (col("time_interval")/1000).cast("timestamp")) \
   .withColumn("date", date_format(to_date(col("date")), 'yyyy/MM/dd'))
cdr.show(5)

+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+------------------+----------+
|square_id|time_interval|country_code|sms_in_activity|sms_out_activity|call_in_activity|call_out_activity|internet_activity|         sms_ratio|      date|
+---------+-------------+------------+---------------+----------------+----------------+-----------------+-----------------+------------------+----------+
|        1|1385852400000|          46|           null|            null|            null|             null|      0.026137425|              null|2013/11/30|
|        1|1385853000000|          39|     0.16513683|      0.17639945|     0.030875085|      0.027300464|        13.330858|0.9361527194661042|2013/11/30|
|        1|1385853600000|           0|    0.029087774|     0.027300464|            null|             null|             null|1.0654681351520536|2013/11/30|
|        1|1385853600000|          39|     0.18645109|      0.13658783

#### 4.4 Calculate summaray statistics at the square_id level

Create a dataframe calculate the aggregation of: 
*  - `sms_in_activity` ==> mean
*  - `sms_out_activity` ==> mean
*  - `call_out_activity` ==> min
*  - `internet_activity` ==> max
*  - `all records` ==> count

In [None]:
cdr_agg= cdr.groupby("square_id") \
 .agg({"sms_in_activity": "mean", \
       "sms_out_activity": "mean", \
       "call_out_activity": "min", \
       "internet_activity": "max", \
       "*": "count"})

In [None]:
from pyspark.sql.functions import *

agg_cols = ['sms_in_activity', 'sms_out_activity','internet_activity','call_in_activity','call_out_activity']

cdr.groupby("square_id") \
 .agg(*[min(col(c)).alias(c+'_min') for c in agg_cols], *[max(col(c)).alias(c+'_max')  for c in agg_cols])\
      .show(5)

+---------+-------------------+--------------------+---------------------+--------------------+---------------------+-------------------+--------------------+---------------------+--------------------+---------------------+
|square_id|sms_in_activity_min|sms_out_activity_min|internet_activity_min|call_in_activity_min|call_out_activity_min|sms_in_activity_max|sms_out_activity_max|internet_activity_max|call_in_activity_max|call_out_activity_max|
+---------+-------------------+--------------------+---------------------+--------------------+---------------------+-------------------+--------------------+---------------------+--------------------+---------------------+
|     1088|        0.020871513|         0.020871513|          0.020871513|         0.020871513|          0.020871513|         0.54749453|           2.6005592|            11.037694|           0.8147374|            1.1373159|
|     1238|        0.028927796|         0.028927796|          0.028927796|         0.028927796|         

In [None]:
cdr_sms_in = cdr.filter('sms_in_activity is not null').groupBy('country_code','date').count().sort('country_code','date').withColumnRenamed('count', 'sms_in')
cdr_sms_out = cdr.filter('sms_out_activity is not null').groupBy('country_code','date').count().sort('country_code','date').withColumnRenamed('count', 'sms_out')
cdr_call_in = cdr.filter('call_in_activity is not null').groupBy('country_code','date').count().sort('country_code','date').withColumnRenamed('count', 'call_in')
cdr_call_out = cdr.filter('call_out_activity is not null').groupBy('country_code','date').count().sort('country_code','date').withColumnRenamed('count', 'call_out')
cdr_internet = cdr.filter('internet_activity is not null').groupBy('country_code','date').count().sort('country_code','date').withColumnRenamed('count', 'internet')

# create a framework to list all the coutry_code and date
cdr_country_date_frame = cdr.select('country_code').distinct().join(cdr.select('date').distinct(), how='cross').sort('country_code','date')

# Use the framework to left join all the above dataframes
cdr_count = cdr_country_date_frame \
            .join(cdr_sms_in, ['country_code', 'date'], how='left') \
            .join(cdr_sms_out, ['country_code', 'date'], how='left') \
            .join(cdr_call_in, ['country_code', 'date'], how='left') \
            .join(cdr_call_out, ['country_code', 'date'], how='left') \
            .join(cdr_internet, ['country_code', 'date'], how='left') 

cdr_sum = cdr_count.select(col('country_code'), col('date'),(col('sms_in')+col('sms_out')).alias('sms'), (col('call_in')+col('call_out')).alias('call'), col('internet')).sort('country_code','date' )

cdr_sum.show()

+------------+----------+-------+------+--------+
|country_code|      date|    sms|  call|internet|
+------------+----------+-------+------+--------+
|           0|2013/11/30|  18218|  1946|    null|
|           0|2013/12/01|1131484|280328|      85|
|           0|2013/12/02|1199033|377085|     425|
|           0|2013/12/03|1245675|383305|     217|
|           0|2013/12/04|1217394|392471|     164|
|           0|2013/12/05|1211760|381485|    1033|
|           1|2013/11/30|   1244|   213|    1491|
|           1|2013/12/01|  20978| 10940|   29182|
|           1|2013/12/02|  25937| 18258|   40073|
|           1|2013/12/03|  24742| 18160|   46107|
|           1|2013/12/04|  23898| 19276|   44993|
|           1|2013/12/05|  26274| 18382|   39517|
|           7|2013/11/30|    495|    26|     299|
|           7|2013/12/01|  13670| 10995|    7864|
|           7|2013/12/02|  21548| 16103|    6284|
|           7|2013/12/03|  22037| 15906|    8173|
|           7|2013/12/04|  24318| 17180|    8850|


In [None]:
cdr_sum_path = "/tmp/cdr_sum.csv"

(cdr_sum.write                       
  .option("delimiter", "\t")  
  .option("header", "true")
  .mode("overwrite")               
  .parquet(cdr_sum_path)               
)

In [None]:
from pyspark.sql.window import Window 
from pyspark.sql.functions import rank 

wSpec3 = Window.partitionBy("country_code").orderBy("internet")

cdr_sum.withColumn("rank", rank().over(wSpec3)).select('country_code','date','rank').show(10)

+------------+----------+----+
|country_code|      date|rank|
+------------+----------+----+
|           0|2013/11/30|   1|
|           0|2013/12/01|   2|
|           0|2013/12/04|   3|
|           0|2013/12/03|   4|
|           0|2013/12/02|   5|
|           0|2013/12/05|   6|
|           1|2013/11/30|   1|
|           1|2013/12/01|   2|
|           1|2013/12/05|   3|
|           1|2013/12/02|   4|
+------------+----------+----+
only showing top 10 rows

