## Creating Spark Session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

spark = SparkSession \
    .builder \
    .appName("Content Watchtime") \
    .getOrCreate()

spark.sparkContext.setLogLevel('WARN')

## Reading CSV file 

In [8]:
data = spark.read.format('com.databricks.spark.csv') \
    .options(header=False, inferschema='true', delimiter="\t") \
    .load('sparkBigData.csv')

data.show()

+---+-----+--------------------+------+------+---+------+-----+
|_c0|  _c1|                 _c2|   _c3|   _c4|_c5|   _c6|  _c7|
+---+-----+--------------------+------+------+---+------+-----+
| de|31334|[07/Aug/2015:20:3...|+0000]| 76410|302|   435|0.326|
| pl|50231|[07/Aug/2015:20:3...|+0000]|126746|200|  7400|  0.0|
| gr| 3329|[07/Aug/2015:20:3...|+0000]|126474|206| 17711|  0.0|
| tr| 9121|[07/Aug/2015:20:3...|+0000]| 76406|200| 19589|  0.0|
| se| 3301|[08/Aug/2015:16:2...|+0000]| 76406|200| 17960|  0.0|
| kz| 9198|[08/Aug/2015:16:3...|+0000]| 76406|200|  2403|  0.0|
| ch| 6830|[08/Aug/2015:16:3...|+0000]| 76406|200| 16099|  0.0|
| rs|21246|[09/Aug/2015:17:1...|+0000]| 76406|200| 64127|  0.0|
| rs|21246|[09/Aug/2015:17:1...|+0000]| 76406|200| 64704|  0.0|
| ro| 9050|[09/Aug/2015:20:3...|+0000]|126518|206|302932|0.504|
| at| 8447|[09/Aug/2015:20:3...|+0000]| 55510|200| 33494|0.006|
| de| 3320|[09/Aug/2015:20:3...|+0000]| 55510|200|603522|2.892|
| de| 3320|[10/Aug/2015:09:1...|+0000]| 

In [12]:
## UDF to convert the string timestamp to pyspark timestamp format

In [9]:
# UDF to remove [ ] from the string.
def timeStamp(timeinList):
    time = timeinList.replace("[",'').replace(']','')
    return time

timer = F.udf(lambda z:timeStamp(z),StringType())

In [10]:
# Format of Input date.
format = "dd/MMM/yyyy:HH:mm:ssZ"

# Putting headers to column
# merging, processing date column
# Calculating throughput per second by dividing Bytes with send/time column.
# Getting Hour from the timestamp column which can be used for further grouping.

data = data.select(F.col('_c0').alias('Country'), F.col('_c1').alias('ASN'), \
            F.unix_timestamp(timer(F.concat_ws("",F.col('_c2'),F.col('_c3'))),format=format).cast('timestamp').alias('timestamp'),\
            F.col('_c4').alias('Metric A '), F.col('_c5').alias('Co Server'), \
            F.col('_c6').alias('Bytes'),F.col('_c7').alias('Send/Time'))\
            .withColumn('Throughput', F.col('Bytes')/F.col('Send/Time')) \
            .withColumn('hour', F.hour('timestamp'))

data = data.fillna({'Throughput':0.0})
data.show(truncate=False)


+-------+-----+-------------------+---------+---------+------+---------+------------------+----+
|Country|ASN  |timestamp          |Metric A |Co Server|Bytes |Send/Time|Throughput        |hour|
+-------+-----+-------------------+---------+---------+------+---------+------------------+----+
|de     |31334|2015-08-08 02:00:01|76410    |302      |435   |0.326    |1334.355828220859 |2   |
|pl     |50231|2015-08-08 02:00:01|126746   |200      |7400  |0.0      |0.0               |2   |
|gr     |3329 |2015-08-08 02:00:01|126474   |206      |17711 |0.0      |0.0               |2   |
|tr     |9121 |2015-08-08 02:00:02|76406    |200      |19589 |0.0      |0.0               |2   |
|se     |3301 |2015-08-08 21:59:00|76406    |200      |17960 |0.0      |0.0               |21  |
|kz     |9198 |2015-08-08 22:00:02|76406    |200      |2403  |0.0      |0.0               |22  |
|ch     |6830 |2015-08-08 22:02:22|76406    |200      |16099 |0.0      |0.0               |22  |
|rs     |21246|2015-08-09 22:4

In [11]:
# Grouping by Keys and removing columns where send time is 0
data.where(F.col('Throughput')!=0.0).groupBy('Co Server').agg(F.avg('Throughput')).show()
data.where(F.col('Throughput')!=0.0).groupBy('ASN').agg(F.avg('Throughput')).show()
data.where(F.col('Throughput')!=0.0).groupBy('ASN','Co Server').agg(F.avg('Throughput')).show()
data.where(F.col('Throughput')!=0.0).groupBy('Country','ASN').agg(F.avg('Throughput')).show()
data.where(F.col('Throughput')!=0.0).groupBy('Country','Co Server').agg(F.avg('Throughput')).show()
data.where(F.col('Throughput')!=0.0).groupBy('hour','Co Server').agg(F.avg('Throughput')).show()
data.where(F.col('Throughput')!=0.0).groupBy('hour','ASN').agg(F.avg('Throughput')).show()

+---------+------------------+
|Co Server|   avg(Throughput)|
+---------+------------------+
|      206|1077081.4964914257|
|      200|2138478.5550050954|
|      302| 1334.355828220859|
+---------+------------------+

+-----+------------------+
|  ASN|   avg(Throughput)|
+-----+------------------+
| 3320|1523100.0164156894|
| 1901| 1505408.510638298|
| 8447| 3369235.632183908|
|15685|1124780.4232804233|
|31334| 1334.355828220859|
| 9050| 601055.5555555555|
+-----+------------------+

+-----+---------+------------------+
|  ASN|Co Server|   avg(Throughput)|
+-----+---------+------------------+
|31334|      302| 1334.355828220859|
| 3320|      200|1523100.0164156894|
| 1901|      206| 1505408.510638298|
|15685|      206|1124780.4232804233|
| 9050|      206| 601055.5555555555|
| 8447|      200| 3369235.632183908|
+-----+---------+------------------+

+-------+-----+------------------+
|Country|  ASN|   avg(Throughput)|
+-------+-----+------------------+
|     at| 8447| 3369235.632183908|
