# Freeswitch Log pyspark

## Altanai Bisht 
( abisht@seattleu.edu
altanai.telecom.com)

License GPL 3

In [None]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
%matplotlib inline

# 1. Call Record

FreeSWITCH CDRs stored in /usr/local/freeswitch/log/cdr-csv/Master.csv
ref : https://freeswitch.org/confluence/display/FREESWITCH/CDR
and https://freeswitch.org/confluence/display/FREESWITCH/mod_cdr_csv

In [None]:
# logfile = "/fslogs/*.log"
# fslogs = sc.textFile("hdfs://localhost:9000/"+logfile)

from pyspark import SparkFiles

df = spark.read.csv(SparkFiles.get("C:\\Users\\abisht\\FS_logs\\*.csv"), header=False, inferSchema= True)

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
import re

# Example Freeswitch cdr-csv log line:
# "8001","8001","0046423112856","public","2020-06-13 00:58:52","2020-06-13 00:58:52","2020-06-13 00:58:53","1","1","USER_NOT_REGISTERED","f2b05a36-ace2-11ea-aa3b-153da8caf925","","","PCMU","PCMU"
# FS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'

In [None]:
df.head(2)

In [None]:
# df2 = spark.createDataFrame(df)
# TypeError: data is already a DataFrame

In [None]:
df.count()

In [None]:
df.show(5)
df.printSchema()

In [None]:
df.show(1, vertical=True)

**DataFrame.collect()** collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [None]:
df.collect()

**Groupby** Group by call duartion 

In [None]:
df.groupby('_c7').avg().show()

In [None]:
# from pyspark.sql.functions import date_format

# df2 = df.withColumn('startTime', date_format('_c4', 'YYYY-MM-DD  HH:mm:ss'))
# df2["startTime"].show()

In [None]:
# df2.head()
# df2["startTime"].head(2)

### Number of unique daily Callers

In [None]:
from pyspark.sql import functions as F

host_day_df = df.select(df['_c1'],
                             F.dayofmonth('_c4').alias('day'))
host_day_df.show(5, truncate=False)

**host_day_distinct_df**

In [None]:
def_mr = pd.get_option('max_rows')
pd.set_option('max_rows', 10)

daily_hosts_df = (host_day_df
                     .groupBy('day')
                     .count()
                     .sort("day"))

daily_hosts_df = daily_hosts_df.toPandas()
daily_hosts_df

In [None]:
c = sns.catplot(x='day', y='count',
                data=daily_hosts_df,
                kind='point', height=5,
                aspect=1.5)

## Average number of daily Calls per Caller 

In [None]:
daily_hosts_df = (host_day_df
                     .groupBy('day')
                     .count()
                     .select(col("day"),col("count").alias("total_hosts")))

total_daily_reqests_df = (df
                      .select(F.dayofmonth("time").alias("day"))
                      .groupBy("day")
                      .count()
                      .select(col("day"), col("count").alias("total_reqs")))

avg_daily_reqests_per_host_df = total_daily_reqests_df.join(daily_hosts_df, 'day')
avg_daily_reqests_per_host_df = (avg_daily_reqests_per_host_df
                                    .withColumn('avg_reqs', col('total_reqs') / col('total_hosts'))
                                    .sort("day"))
avg_daily_reqests_per_host_df = avg_daily_reqests_per_host_df.toPandas()
avg_daily_reqests_per_host_df

In [None]:
c = sns.catplot(x='day', y='avg_reqs',
                data=avg_daily_reqests_per_host_df,
                kind='point', height=5, aspect=1.5)

## Aggregate

In [None]:
(df.agg(F.min(df['_c7']).alias('min_content_size'),
            F.max(df['_c7']).alias('max_content_size'),
            F.mean(df['_c7']).alias('mean_content_size'),
            F.stddev(df['_c7']).alias('std_content_size'),
            F.count(df['_c7']).alias('count_content_size'))
        .toPandas())

## 4.3  Call Hangup/status code analysis

In [None]:
status_freq_df = (df.groupBy('_c9')
                     .count()
                     .sort('_c9')
                     .cache())
print('Total distinct HTTP Status Codes:', status_freq_df.count())   

In [None]:
status_freq_pd_df = (status_freq_df
                         .toPandas()
                         .sort_values(by=['count'],
                                      ascending=False))
status_freq_pd_df

In [None]:
# import matplotlib.pylab as plt
sns.catplot(x='_c9', y='count', data=status_freq_pd_df,
            kind='bar',
            order=status_freq_pd_df['_c9'])
plt.xticks(rotation=45)

# 3. Counting NORMAL_CLEARING response codes

In [None]:
normal_clearing_df = df.filter(df["_c9"] == 'NORMAL_CLEARING').cache()
print(('Total NORMAL_CLEARING hangupcode: {}').format(normal_clearing_df.count()))

In [None]:
# Top 20 normal clearings
normal_clearing_count_df = (normal_clearing_df
                          .groupBy("_c1")
                          .count()
                          .sort("count", ascending=False)
                          .limit(20))

normal_clearing_count_df.show(truncate=False)

## normal_clearings per day

In [None]:
normal_clearing_perday_df = (normal_clearing_df
                                .groupBy(F.dayofmonth('_c4').alias('day'))
                                .count()
                                .sort("day"))

normal_clearing_perday_pd_df = normal_clearing_perday_df.toPandas()
normal_clearing_perday_pd_df

In [None]:
c = sns.catplot(x='day', y='count',
                data=normal_clearing_perday_pd_df,
                kind='point', height=5, aspect=1.5)

## Top three days for normal clearning 

In [None]:
(normal_clearing_count_df
    .sort("count", ascending=False)
    .show(3))

## Hourly normal clearings

In [None]:
hourly_avg_normal_clearing_df = (normal_clearing_df
                                   .groupBy(F.hour('_c4').alias('hour'))
                                   .count()
                                   .sort('hour'))
hourly_avg_normal_clearing_pd_df = hourly_avg_normal_clearing_df.toPandas()

c = sns.catplot(x='hour', y='count',
                data=hourly_avg_normal_clearing_pd_df,
                kind='bar', height=5, aspect=1.5)

##   Caller / Call Destinations

In [None]:
host_sum_df =(df
               .groupBy('_c2')
               .count()
               .sort('count', ascending=False).limit(10))

host_sum_df.show(truncate=False)

# 4 Call duration per hour 

In [None]:
callduration_df = df.filter(df["_c7"] > 0).cache()
print(('Total Calls with non zero Duration is {}').format(callduration_df.count()))

In [None]:
callduration_df.toPandas().plot()

In [None]:
hourly_callduration_df = (callduration_df
                                   .groupBy(F.hour('_c4').alias('hour'))
                                   .count()
                                   .sort('hour'))
hourly_callduration_pd_df = hourly_callduration_df.toPandas()

c = sns.catplot(x='hour', y='count',
                data=hourly_callduration_pd_df,
                kind='bar', height=5, aspect=1.5)

In [None]:
host_sum_pd_df = host_sum_df.toPandas()
host_sum_pd_df.iloc[8]['_c2']

# Carbon Footprint

In [None]:
import carbonfootprint as carbonfp
# from carbonfp.power_utilities.carbon import Carbon
# from power_utilities.fuelmix import Fuelmix
# # from power_utilities.Fuelmix import calculate_fuelmixbypercent
# # from power_utilities import carbon_by_fuelmix
# from power_utilities import fuelmix

In [None]:
carbonfp

In [None]:
# for importer, modname, ispkg in carbonfp(path=None, onerror=lambda x: None):
#     print(modname)

for importer, modname, ispkg in carbonfp.iter_modules(package.__path__):
    print("Found submodule %s (is a package: %s)" % (modname, ispkg))

In [None]:
furlpercent_obj = carbonfp.power_utilities.Fuelmix("north_west")

In [None]:

dfindus_hourly_fuelmix = furlpercent_obj.calculate_fuelmixbypercent(dfindus_hourly)
carbonemissio_pbj = Carbon()
dfindus_hourly_fuelmix_carbon = carbonemissio_pbj.calculate_carbonemission(dfindus_hourly_fuelmix)
print("-------------dfindus_hourly_fuelmix_carbon------------------")
print(dfindus_hourly_fuelmix_carbon.head())
print(dfindus_hourly_fuelmix_carbon.columns)


**Refs**
- https://opensource.com/article/19/5/visualize-log-data-apache-spark