Assignment 2: Analyzing Call Records with Apache Spark
Objective: In this task, you will work with a dataset of call records to analyze call patterns using Apache Spark. This exercise will familiarize you with data preprocessing and basic data analysis using Spark's capabilities.
Dataset: You should create dataset of call records in a CSV format (at least 1000 records). You can use websites as https://www.mockaroo.com/ , or you can create it by yourself, using PySpark.
Instructions:
1.	Data Loading: (10%)
•	Create a dataset with information about ID of a caller, ID of a receiver, duration of a call, call timestamp and location. You should find out appropriate data type for each variable. (8%)
•	Load the call records dataset into an RDD or DataFrame in Apache Spark. (2%)
2.	Data Exploration: (10%)
•	Explore the dataset to understand its structure and contents. Display sample records to get a sense of the data:
1.	Dimensions; (1%)
2.	Datatypes; (2%)
3.	Summary statistics; (3%)
4.	Unique and missing values; (1%)
5.	Correlation matrix. (3%)
3.	Data Preprocessing: (20%)
•	Clean the data by handling missing values, if any. 
•	Convert the timestamp field to a proper datetime format.
•	Extract relevant features from the data, such as day of the week, time of day, or call duration categories.
4.	Data Analysis: (25%)
•	Calculate statistics like the total number of calls, average call duration, and peak call times. (10%)
•	Identify the most frequent callers and receivers. (5%)
•	Find out if there are any unusual call patterns or anomalies in the data. (10%)
5.	Visualization: (15%)
•	Create visualizations (e.g., bar charts, line plots) using Spark's built-in visualization libraries or export data for external visualization tools.
6.	Conclusion: (20%)
•	Summarize the key insights gained from the data analysis. (10%)
•	What patterns or trends did you discover in the call records dataset? (10%)
Bonus (8%): Create a csv dataset by using PySpark, there should be unique code written by you.


In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import current_timestamp
spark = SparkSession.builder.getOrCreate()

In [5]:
schema = StructType([
    StructField("caller_id", IntegerType(), True),
    StructField("receiver_id", IntegerType(), True),
    StructField("duration", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("location", StringType(), True)
])

In [6]:
import random
import datetime

# Generate random data for 1000 records
data = []
for i in range(1000):
    caller_id = random.randint(1, 10000)
    receiver_id = random.randint(1, 10000)
    duration = random.randint(1, 3600)  # Duration in seconds
    timestamp = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 365))
    location = random.choice(["Almaty", "New York", "London", "Paris", "Tokyo", "Astana", "Shymkent", "Aktau", "Aktobe", "Atyrau", "Oral", "Kyzylorda", "Taraz", "Turkistan", "Karagandy", "Pavlodar", "Petropavlovsk", "Semey", "Ust-Kamenogorsk", "Talgar", "Moscow", "Kiev", "Kyoto", "Beijing", "Washingtonn", "Lisbon", "Madrid", "Barcelona", "Valencia", "Buenos Aires", "Rome", "Amsterdam", "Vienna", "Berlin", "Sofia", "Prague", "Stockholm", "Buchares", "Vatican", "Montreal", "Monaco", "Toronto", "Kanberra", "Seattle", "Ottawa", "Mexico City", "Rio De Janeiro", "Brasilia", "New-Delhi", "Baku", "Berne", "Frankfurt", "Munich", "Dublin", "Ankara", "Istanbul", "Ashhabad", "Bangkok", "Dushanbe", "Lubljiana", "Singapore", "Belgrad", "Er-Riyadh", "Warsaw", "Panama", "Abu Dhabi", "Oslo", "Kathmandu", "Kishinev", "Rabat", "Kuala Lumpur", "Jakarta", "Baghdad", "Tehran", "Amman", "Kabul", "Yerevan", "Tbilisi", "Algeria", "Dhaka", "Caracas", "Hanoi", "Seoul", "Pyongyang", "Tallinn", "Santiago", "Zagreb", "Vilnius", "Tripoli", "Riga", "Havana", "Beirut", "Ulaanbaatar", "Luxembourg", "Copenhagen"])

    data.append((caller_id, receiver_id, duration, timestamp, location))

In [7]:
df = spark.createDataFrame(data, schema)

In [8]:
# df.write.csv("call_records.csv", header=True)

In [9]:
df.show()

+---------+-----------+--------+--------------------+--------------+
|caller_id|receiver_id|duration|           timestamp|      location|
+---------+-----------+--------+--------------------+--------------+
|      523|       8303|    1454|2023-08-31 17:40:...|       Vilnius|
|     8450|       7743|    1539|2023-02-14 17:40:...|     Amsterdam|
|     8848|       1815|     581|2022-12-30 17:40:...|     Kathmandu|
|     1989|       3532|    1996|2023-03-27 17:40:...|     Singapore|
|     7980|       4544|    3275|2023-07-26 17:40:...|     New-Delhi|
|     2852|       7607|    3381|2023-02-01 17:40:...|         Dhaka|
|     3204|       5257|    2388|2023-05-29 17:40:...|      Shymkent|
|     7429|       6708|    2688|2023-02-06 17:40:...|         Paris|
|     9457|       5453|    1620|2023-01-23 17:40:...|Rio De Janeiro|
|     8296|       9888|    1577|2023-09-27 17:40:...|      Montreal|
|     8414|       5048|    3391|2023-01-30 17:40:...|         Kabul|
|     3755|       2297|    2559|20

In [10]:
df.select("location").distinct().count()

95

In [11]:
df.printSchema()

root
 |-- caller_id: integer (nullable = true)
 |-- receiver_id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- location: string (nullable = true)



In [12]:
from pyspark.sql.functions import col, sum

null_count1 = df.filter(col("caller_id").isNull()).count()
null_count2 = df.filter(col("receiver_id").isNull()).count()
null_count3 = df.filter(col("duration").isNull()).count()
null_count4 = df.filter(col("timestamp").isNull()).count()
null_count5 = df.filter(col("location").isNull()).count()

print("Number of null values in 'caller_id':", null_count1)
print("Number of null values in 'receiver_id':", null_count2)
print("Number of null values in 'duration':", null_count3)
print("Number of null values in 'timestamp':", null_count4)
print("Number of null values in 'location':", null_count5)


Number of null values in 'caller_id': 0
Number of null values in 'receiver_id': 0
Number of null values in 'duration': 0
Number of null values in 'timestamp': 0
Number of null values in 'location': 0


In [13]:
# Dimensions
print("Number of rows in the dataset:", df.count())
print("Number of columns in the dataset:", len(df.columns))

# Datatypes
print("Data types of each column:")
df.dtypes

# Summary statistics
print("Summary statistics:")
df.describe().show()

# Unique and missing values
print("Number of unique values in 'caller_id':", df.select("caller_id").distinct().count())
print("Number of unique values in 'receiver_id':", df.select("receiver_id").distinct().count())
print("Number of unique values in 'duration':", df.select("duration").distinct().count())
print("Number of unique values in 'location':", df.select("location").distinct().count())

# Correlation matrix
print("Correlation matrix:")
df.stat.corr("caller_id", "receiver_id")


Number of rows in the dataset: 1000
Number of columns in the dataset: 5
Data types of each column:
Summary statistics:
+-------+------------------+----------------+-----------------+---------+
|summary|         caller_id|     receiver_id|         duration| location|
+-------+------------------+----------------+-----------------+---------+
|  count|              1000|            1000|             1000|     1000|
|   mean|           4909.37|        4972.353|         1820.259|     NULL|
| stddev|2842.5928909084823|2898.02140959681|1043.218673682618|     NULL|
|    min|                 2|              10|                6|Abu Dhabi|
|    max|              9992|            9993|             3599|   Zagreb|
+-------+------------------+----------------+-----------------+---------+

Number of unique values in 'caller_id': 964
Number of unique values in 'receiver_id': 964
Number of unique values in 'duration': 873
Number of unique values in 'location': 95
Correlation matrix:


0.02570802127433903

In [17]:
from pyspark.sql.functions import hour, when, date_format

# ...

# Extract relevant features before performing the analysis
df = df.withColumn("day_of_week", date_format("timestamp", "E"))
df = df.withColumn("time_of_day", when(hour("timestamp").between(0, 11), "Morning").when(hour("timestamp").between(12, 17), "Afternoon").otherwise("Evening"))
df = df.withColumn("duration_category", when(col("duration") < 300, "Short").when((col("duration") >= 300) & (col("duration") < 900), "Medium").otherwise("Long"))

# ...

# Total number of calls, average call duration, and peak call times
total_calls = df.count()
average_duration = df.agg({"duration": "mean"}).collect()[0][0]
peak_call_times = df.groupBy("time_of_day").count().orderBy("count", ascending=False).first()["time_of_day"]

print("Total Number of Calls:", total_calls)
print("Average Call Duration:", average_duration)
print("Peak Call Time:", peak_call_times)


Total Number of Calls: 1000
Average Call Duration: 1820.259
Peak Call Time: Afternoon


In [18]:
# Most frequent callers and receivers
most_frequent_callers = df.groupBy("caller_id").count().orderBy("count", ascending=False).limit(5)
most_frequent_receivers = df.groupBy("receiver_id").count().orderBy("count", ascending=False).limit(5)

print("Most Frequent Callers:")
most_frequent_callers.show()

print("Most Frequent Receivers:")
most_frequent_receivers.show()

Most Frequent Callers:
+---------+-----+
|caller_id|count|
+---------+-----+
|     2114|    3|
|     7250|    2|
|     3742|    2|
|     1952|    2|
|     7429|    2|
+---------+-----+

Most Frequent Receivers:
+-----------+-----+
|receiver_id|count|
+-----------+-----+
|       6323|    3|
|         49|    2|
|       9474|    2|
|       9513|    2|
|       3740|    2|
+-----------+-----+



In [19]:
# Identifying unusual call patterns or anomalies
anomalies = df.filter(col("duration") > 3600)  # Filter calls with duration more than 1 hour
print("Anomalies (Calls with Duration > 1 hour):")
anomalies.show()

Anomalies (Calls with Duration > 1 hour):
+---------+-----------+--------+---------+--------+-----------+-----------+-----------------+
|caller_id|receiver_id|duration|timestamp|location|day_of_week|time_of_day|duration_category|
+---------+-----------+--------+---------+--------+-----------+-----------+-----------------+
+---------+-----------+--------+---------+--------+-----------+-----------+-----------------+



In [22]:
# # Plotting the distribution of call durations using Spark's built-in histogram function
# df.select("duration").rdd.flatMap(lambda x: x).histogram(10)

In [21]:
spark.stop()