In [None]:
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# CDR Analysis using PySpark\n",
    "\n",
    "This notebook demonstrates how to analyze Call Detail Records (CDR) using PySpark."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Setup and Data Loading"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "import os\n",
    "import pandas as pd\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sns\n",
    "\n",
    "# Add the parent directory to the path\n",
    "sys.path.insert(0, '..')\n",
    "\n",
    "# Import project modules\n",
    "from utils.spark_session import create_spark_session\n",
    "from scripts.read_data import read_cdr_data, define_schema\n",
    "from scripts.preprocess import preprocess_cdr_data, validate_data_quality\n",
    "\n",
    "# Create Spark session\n",
    "spark = create_spark_session(\"CDR Analysis Notebook\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Check if data exists, if not generate it\n",
    "if not os.path.exists(\"../data/call_logs.csv\"):\n",
    "    print(\"Generating sample data...\")\n",
    "    from scripts.data_generator import generate_sample_cdr_data\n",
    "    generate_sample_cdr_data(\"../data/call_logs.csv\", num_records=10000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Read the data\n",
    "raw_df = read_cdr_data(spark, \"../data/call_logs.csv\")\n",
    "raw_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Display sample data\n",
    "raw_df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Data Preprocessing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Preprocess the data\n",
    "df = preprocess_cdr_data(raw_df)\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Validate data quality\n",
    "validate_data_quality(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Cache the data for better performance\n",
    "df.cache()\n",
    "\n",
    "# Create a temporary view for SQL queries\n",
    "df.createOrReplaceTempView(\"cdr_data\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Data Exploration"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Get basic statistics\n",
    "from pyspark.sql.functions import count, countDistinct\n",
    "\n",
    "total_records = df.count()\n",
    "unique_callers = df.select(countDistinct(\"caller_number\")).collect()[0][0]\n",
    "unique_callees = df.select(countDistinct(\"callee_number\")).collect()[0][0]\n",
    "\n",
    "print(f\"Total records: {total_records}\")\n",
    "print(f\"Unique callers: {unique_callers}\")\n",
    "print(f\"Unique callees: {unique_callees}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Call type distribution\n",
    "call_type_dist = df.groupBy(\"call_type\").count().toPandas()\n",
    "\n",
    "plt.figure(figsize=(10, 6))\n",
    "plt.bar(call_type_dist[\"call_type\"], call_type_dist[\"count\"])\n",
    "plt.title(\"Call Type Distribution\")\n",
    "plt.xlabel(\"Call Type\")\n",
    "plt.ylabel(\"Count\")\n",
    "plt.grid(axis=\"y\", linestyle=\"--\", alpha=0.7)\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Call status distribution\n",
    "status_dist = df.groupBy(\"status\").count().toPandas()\n",
    "\n",
    "plt.figure(figsize=(10, 6))\n",
    "plt.pie(status_dist[\"count\"], labels=status_dist[\"status\"], autopct=\"%1.1f%%\", startangle=90)\n",
    "plt.title(\"Call Status Distribution\")\n",
    "plt.axis(\"equal\")  # Equal aspect ratio ensures that pie is drawn as a circle\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. User Analysis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import sum as spark_sum, avg, desc\n",
    "\n",
    "# Group by caller and calculate metrics\n",
    "user_activity = (df.groupBy(\"caller_number\")\n",
    "                 .agg(\n",
    "                     count(\"*\").alias(\"total_calls\"),\n",
    "                     spark_sum(\"effective_duration\").alias(\"total_duration_seconds\"),\n",
    "                     avg(\"effective_duration\").alias(\"avg_duration_seconds\"),\n",
    "                     spark_sum(\"call_success\").alias(\"successful_calls\"),\n",
    "                     spark_sum(\"cost\").alias(\"total_cost\")\n",
    "                 )\n",
    "                 .withColumn(\"call_success_rate\", \n",
    "                             col(\"successful_calls\") / col(\"total_calls\")))\n",
    "\n",
    "# Show top users by call volume\n",
    "user_activity.orderBy(desc(\"total_calls\")).show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Convert to Pandas for visualization\n",
    "top_users = user_activity.orderBy(desc(\"total_calls\")).limit(10).toPandas()\n",
    "\n",
    "plt.figure(figsize=(12, 6))\n",
    "plt.bar(top_users[\"caller_number\"].astype(str), top_users[\"total_calls\"])\n",
    "plt.title(\"Top 10 Users by Call Volume\")\n",
    "plt.xlabel(\"Caller Number\")\n",
    "plt.ylabel(\"Total Calls\")\n",
    "plt.xticks(rotation=45)\n",
    "plt.grid(axis=\"y\", linestyle=\"--\", alpha=0.7)\n",
    "plt.tight_layout()\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Time-based Analysis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Hourly patterns\n",
    "hourly_patterns = (df.groupBy(\"call_hour\")\n",
    "                  .agg(\n",
    "                      count(\"*\").alias(\"total_calls\"),\n",
    "                      spark_sum(\"effective_duration\").alias(\"total_duration_seconds\")\n",
    "                  )\n",
    "                  .orderBy(\"call_hour\"))\n",
    "\n",
    "# Convert to Pandas for visualization\n",
    "hourly_df = hourly_patterns.toPandas()\n",
    "\n",
    "plt.figure(figsize=(12, 6))\n",
    "plt.bar(hourly_df[\"call_hour\"], hourly_df[\"total_calls\"])\n",
    "plt.title(\"Call Volume by Hour of Day\")\n",
    "plt.xlabel(\"Hour of Day\")\n",
    "plt.ylabel(\"Total Calls\")\n",
    "plt.xticks(range(0, 24))\n",
    "plt.grid(axis=\"y\", linestyle=\"--\", alpha=0.7)\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6. Operator Analysis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Operator performance\n",
    "operator_perf = (df.groupBy(\"caller_operator\")\n",
    "                .agg(\n",
    "                    count(\"*\").alias(\"total_calls\"),\n",
    "                    spark_sum(\"call_success\").alias(\"successful_calls\")\n",
    "                )\n",
    "                .withColumn(\"success_rate\", col(\"successful_calls\") / col(\"total_calls\")))\n",
    "\n",
    "operator_perf.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Convert to Pandas for visualization\n",
    "op_df = operator_perf.toPandas()\n",
    "\n",
    "plt.figure(figsize=(10, 6))\n",
    "plt.bar(op_df[\"caller_operator\"], op_df[\"success_rate\"] * 100)\n",
    "plt.title(\"Call Success Rate by Operator\")\n",
    "plt.xlabel(\"Operator\")\n",
    "plt.ylabel(\"Success Rate (%)\")\n",
    "plt.ylim(0, 100)\n",
    "plt.grid(axis=\"y\", linestyle=\"--\", alpha=0.7)\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7. SQL Queries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Example SQL query: Top 5 callers by total cost\n",
    "query = \"\"\"\n",
    "SELECT \n",
    "    caller_number,\n",
    "    SUM(cost) as total_cost,\n",
    "    COUNT(*) as total_calls,\n",
    "    SUM(cost) / COUNT(*) as avg_cost_per_call\n",
    "FROM cdr_data\n",
    "GROUP BY caller_number\n",
    "ORDER BY total_cost DESC\n",
    "LIMIT 5\n",
    "\"\"\"\n",
    "\n",
    "spark.sql(query).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Example SQL query: Call duration distribution by call type\n",
    "query = \"\"\"\n",
    "SELECT \n",
    "    call_type,\n",
    "    COUNT(*) as total_calls,\n",
    "    AVG(duration) as avg_duration,\n",
    "    MIN(duration) as min_duration,\n",
    "    MAX(duration) as max_duration\n",
    "FROM cdr_data\n",
    "GROUP BY call_type\n",
    "\"\"\"\n",
    "\n",
    "spark.sql(query).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 8. Advanced Analysis: Call Network Graph (Optional)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create a network of callers and callees\n",
    "# This is just a preview, full implementation would require NetworkX or similar library\n",
    "call_network = df.select(\"caller_number\", \"callee_number\", \"duration\").limit(100).toPandas()\n",
    "call_network.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 9. Cleanup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Clean up Spark session\n",
    "spark.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}