<a href="https://colab.research.google.com/github/Dobby-Mphahlele/Problem-set-2/blob/main/Dobby_VoIP_Call_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
!pip install pyspark



### 1. Loading the data

In [19]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, sum as spark_sum, min as spark_min, max as spark_max


# Initialize Spark session
spark = SparkSession.builder.appName("IPDR Analysis").getOrCreate()

# GitHub raw file URL
url = 'https://raw.githubusercontent.com/Dobby-Mphahlele/Problem-set-2/1b478910cfb09c6768b5ebb490fe80c302d4b5ba/ipdr.csv'

# Reading the CSV file into a Pandas DataFrame
df_pd = pd.read_csv(url)

# Converting Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df_pd)

# Showing the schema and a few rows of the Spark DataFrame
spark_df.printSchema()
spark_df.show(5)


root
 |-- starttime: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- msisdn: long (nullable = true)
 |-- ulvolume: long (nullable = true)
 |-- dlvolume: long (nullable = true)
 |-- domain: string (nullable = true)

+------------------+------------------+------+--------+--------+------+
|         starttime|           endtime|msisdn|ulvolume|dlvolume|domain|
+------------------+------------------+------+--------+--------+------+
|2021-04-0212:23:10|2021-04-0212:24:48|     1|   10819|    9960|  app1|
|2021-04-0212:28:56|2021-04-0212:33:12|     1|   16067|   10663|  app1|
|2021-04-0216:24:21|2021-04-0216:38:28|     2|    1173|    4265|  app1|
|2021-04-0215:08:57|2021-04-0215:20:56|     2|    1200|     192|  app2|
|2021-04-0215:08:57|2021-04-0215:20:41|     2|  175130|  101657|  app2|
+------------------+------------------+------+--------+--------+------+
only showing top 5 rows



Converting datetime columns to timestamp type

In [22]:

spark_df = spark_df.withColumn("ST", unix_timestamp(col("starttime"), "yyyy-MM-ddHH:mm:ss").cast("timestamp"))
spark_df = spark_df.withColumn("ET", unix_timestamp(col("endtime"), "yyyy-MM-ddHH:mm:ss").cast("timestamp"))

# Show the updated DataFrame
spark_df.dtypes


[('starttime', 'string'),
 ('endtime', 'string'),
 ('msisdn', 'bigint'),
 ('ulvolume', 'bigint'),
 ('dlvolume', 'bigint'),
 ('domain', 'string'),
 ('ST', 'timestamp'),
 ('ET', 'timestamp')]

### 1. Selecting each MSISDN and specific start and end datetime domain/app wise:

In [23]:
# Group by MSISDN, domain, and VoIP APP to identify each call
call_df = spark_df.groupBy("msisdn", "domain").agg(
    spark_min("starttime").alias("First_ST"),
    spark_max("endtime").alias("Last_ET"),
    spark_sum("dlvolume").alias("Total_DL_Volume"),
    spark_sum("ulvolume").alias("Total_UL_Volume")
)

# Show the aggregated data
call_df.show()


+------+------+------------------+------------------+---------------+---------------+
|msisdn|domain|          First_ST|           Last_ET|Total_DL_Volume|Total_UL_Volume|
+------+------+------------------+------------------+---------------+---------------+
|     1|  app1|2021-04-0212:23:10|2021-04-0212:33:12|          20623|          26886|
|     2|  app1|2021-04-0216:24:21|2021-04-0216:38:28|           4265|           1173|
|     2|  app2|2021-04-0215:08:57|2021-04-0215:21:08|         103001|         182882|
|     3|  app3|2021-04-0215:54:46|2021-04-0216:11:49|        1801756|        2104664|
|     3|  app4|2021-04-0212:13:11|2021-04-0217:57:03|        8287876|        8034960|
|     4|  app3|2021-04-0212:14:42|2021-04-0221:04:25|         415065|         667564|
|     4|  app4|2021-04-0222:27:36|2021-04-0222:34:55|          33656|         218076|
+------+------+------------------+------------------+---------------+---------------+



### Calculating ET(ET-10 min) for each FDR and handling idle time exclusion

In [24]:
from pyspark.sql.functions import expr, col

# Calculate ET - 10 minutes, handling nulls with COALESCE
call_df = call_df.withColumn("ET_minus_10min", expr("COALESCE(Last_ET - interval 10 minutes, Last_ET)"))

# If ET-10 min < ST, keep the original ET
call_df = call_df.withColumn("Final_ET", expr("CASE WHEN COALESCE(ET_minus_10min, Last_ET) < First_ST THEN Last_ET ELSE ET_minus_10min END"))

# Show the updated DataFrame
call_df.show()



+------+------+------------------+------------------+---------------+---------------+------------------+------------------+
|msisdn|domain|          First_ST|           Last_ET|Total_DL_Volume|Total_UL_Volume|    ET_minus_10min|          Final_ET|
+------+------+------------------+------------------+---------------+---------------+------------------+------------------+
|     1|  app1|2021-04-0212:23:10|2021-04-0212:33:12|          20623|          26886|2021-04-0212:33:12|2021-04-0212:33:12|
|     2|  app1|2021-04-0216:24:21|2021-04-0216:38:28|           4265|           1173|2021-04-0216:38:28|2021-04-0216:38:28|
|     2|  app2|2021-04-0215:08:57|2021-04-0215:21:08|         103001|         182882|2021-04-0215:21:08|2021-04-0215:21:08|
|     3|  app3|2021-04-0215:54:46|2021-04-0216:11:49|        1801756|        2104664|2021-04-0216:11:49|2021-04-0216:11:49|
|     3|  app4|2021-04-0212:13:11|2021-04-0217:57:03|        8287876|        8034960|2021-04-0217:57:03|2021-04-0217:57:03|
|     4|

### Calculating total volume of each call in Kb

In [30]:
# Calculating total volume in Kb (since UL and DL volumes are in bytes)
call_df = call_df.withColumn("Total_Volume_Kb", (col("Total_DL_Volume") + col("Total_UL_Volume")) / 1024)

# Registering ipdr as a temporary view
call_df.createOrReplaceTempView('ipdr')
# Showing the updated DataFrame
call_df.show()



+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+
|msisdn|domain|          First_ST|           Last_ET|Total_DL_Volume|Total_UL_Volume|    ET_minus_10min|          Final_ET|Total_Volume_Kb|
+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+
|     1|  app1|2021-04-0212:23:10|2021-04-0212:33:12|          20623|          26886|2021-04-0212:33:12|2021-04-0212:33:12|  46.3955078125|
|     2|  app1|2021-04-0216:24:21|2021-04-0216:38:28|           4265|           1173|2021-04-0216:38:28|2021-04-0216:38:28|    5.310546875|
|     2|  app2|2021-04-0215:08:57|2021-04-0215:21:08|         103001|         182882|2021-04-0215:21:08|2021-04-0215:21:08| 279.1826171875|
|     3|  app3|2021-04-0215:54:46|2021-04-0216:11:49|        1801756|        2104664|2021-04-0216:11:49|2021-04-0216:11:49|  3814.86328125|
|     3|  app4|2021-

### Calculating total time of each call in seconds

In [36]:
from pyspark.sql.functions import unix_timestamp, col

# Calculate total time in seconds
total_time_df = spark.sql("""
  SELECT
    *,
    (UNIX_TIMESTAMP(Last_ET, "yyyy-MM-ddHH:mm:ss") - UNIX_TIMESTAMP(First_ST, "yyyy-MM-ddHH:mm:ss")) AS Total_Time_Sec
  FROM ipdr
""")
total_time_df.createOrReplaceTempView("volume_time_ipdr")
total_time_df.show()






+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+
|msisdn|domain|          First_ST|           Last_ET|Total_DL_Volume|Total_UL_Volume|    ET_minus_10min|          Final_ET|Total_Volume_Kb|Total_Time_Sec|
+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+
|     1|  app1|2021-04-0212:23:10|2021-04-0212:33:12|          20623|          26886|2021-04-0212:33:12|2021-04-0212:33:12|  46.3955078125|           602|
|     2|  app1|2021-04-0216:24:21|2021-04-0216:38:28|           4265|           1173|2021-04-0216:38:28|2021-04-0216:38:28|    5.310546875|           847|
|     2|  app2|2021-04-0215:08:57|2021-04-0215:21:08|         103001|         182882|2021-04-0215:21:08|2021-04-0215:21:08| 279.1826171875|           731|
|     3|  app3|2021-04-0215:54:46|2021-04-0216:11:49|        1801756| 

### Calculating bit rate (kbps) of each call

In [45]:
bit_rate_df = spark.sql(
    """
    SELECT
      *,
    ((Total_Volume_Kb) / (Total_Time_Sec)) * 1000  AS Bit_Rate_Kbps
    FROM volume_time_ipdr
  """
)
bit_rate_df.createOrReplaceTempView("bit_rate_ipdr")
bit_rate_df.show()

+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+------------------+
|msisdn|domain|          First_ST|           Last_ET|Total_DL_Volume|Total_UL_Volume|    ET_minus_10min|          Final_ET|Total_Volume_Kb|Total_Time_Sec|     Bit_Rate_Kbps|
+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+------------------+
|     1|  app1|2021-04-0212:23:10|2021-04-0212:33:12|          20623|          26886|2021-04-0212:33:12|2021-04-0212:33:12|  46.3955078125|           602| 77.06894985465117|
|     2|  app1|2021-04-0216:24:21|2021-04-0216:38:28|           4265|           1173|2021-04-0216:38:28|2021-04-0216:38:28|    5.310546875|           847| 6.269831021251475|
|     2|  app2|2021-04-0215:08:57|2021-04-0215:21:08|         103001|         182882|2021-04-0215:21:08|2021-04-0215:21:08| 279.18

### Identification of Audio or Video call and its count

In [47]:
result_df = spark.sql("""
SELECT
    *,
    CASE
        WHEN Bit_Rate_Kbps <= 200 THEN 'Yes'
        ELSE 'No'
    END AS isAudio,
    CASE
        WHEN Bit_Rate_Kbps > 200 THEN 'Yes'
        ELSE 'No'
    END AS isVideo
FROM bit_rate_ipdr
WHERE Bit_Rate_Kbps >= 10  --- Filtering out calls with bit rate < 10 kbps
"""

)
result_df.show()

+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+------------------+-------+-------+
|msisdn|domain|          First_ST|           Last_ET|Total_DL_Volume|Total_UL_Volume|    ET_minus_10min|          Final_ET|Total_Volume_Kb|Total_Time_Sec|     Bit_Rate_Kbps|isAudio|isVideo|
+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+------------------+-------+-------+
|     1|  app1|2021-04-0212:23:10|2021-04-0212:33:12|          20623|          26886|2021-04-0212:33:12|2021-04-0212:33:12|  46.3955078125|           602| 77.06894985465117|    Yes|     No|
|     2|  app2|2021-04-0215:08:57|2021-04-0215:21:08|         103001|         182882|2021-04-0215:21:08|2021-04-0215:21:08| 279.1826171875|           731| 381.9187649623803|     No|    Yes|
|     3|  app3|2021-04-0215:54:46|2021-04-0216:11:

### Result

In [48]:

result_df.select("msisdn", "domain", "Total_Time_Sec", "Total_Volume_Kb", "Bit_Rate_kbps", "isAudio", "isVideo")
result_df.show(truncate=False)



+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+------------------+-------+-------+
|msisdn|domain|First_ST          |Last_ET           |Total_DL_Volume|Total_UL_Volume|ET_minus_10min    |Final_ET          |Total_Volume_Kb|Total_Time_Sec|Bit_Rate_Kbps     |isAudio|isVideo|
+------+------+------------------+------------------+---------------+---------------+------------------+------------------+---------------+--------------+------------------+-------+-------+
|1     |app1  |2021-04-0212:23:10|2021-04-0212:33:12|20623          |26886          |2021-04-0212:33:12|2021-04-0212:33:12|46.3955078125  |602           |77.06894985465117 |Yes    |No     |
|2     |app2  |2021-04-0215:08:57|2021-04-0215:21:08|103001         |182882         |2021-04-0215:21:08|2021-04-0215:21:08|279.1826171875 |731           |381.9187649623803 |No     |Yes    |
|3     |app3  |2021-04-0215:54:46|2021-04-0216:11:

In [50]:
# Registering final_df as a temporary view
result_df.createOrReplaceTempView("result_df")

# Perform the aggregation using Spark SQL
final_df = spark.sql("""
  SELECT
    msisdn,
    domain,
    Total_Time_Sec,
    Total_Volume_Kb,
    Bit_Rate_kbps as kbps,
    COUNT(*) OVER (PARTITION BY msisdn) AS fdr_count,
    isAudio,
    isVideo
  FROM result_df
  GROUP BY msisdn, domain, Total_Time_Sec, Total_Volume_Kb, Bit_Rate_kbps, isAudio, isVideo
""")

final_df.show(truncate=False)




+------+------+--------------+---------------+------------------+---------+-------+-------+
|msisdn|domain|Total_Time_Sec|Total_Volume_Kb|kbps              |fdr_count|isAudio|isVideo|
+------+------+--------------+---------------+------------------+---------+-------+-------+
|1     |app1  |602           |46.3955078125  |77.06894985465117 |1        |Yes    |No     |
|2     |app2  |731           |279.1826171875 |381.9187649623803 |1        |No     |Yes    |
|3     |app3  |1023          |3814.86328125  |3729.094116568915 |2        |No     |Yes    |
|3     |app4  |20632         |15940.26953125 |772.5993374975766 |2        |No     |Yes    |
|4     |app3  |31783         |1057.2548828125|33.264791958358245|2        |Yes    |No     |
|4     |app4  |439           |245.83203125   |559.9818479498862 |2        |No     |Yes    |
+------+------+--------------+---------------+------------------+---------+-------+-------+

