<a href="https://colab.research.google.com/github/Articbug/Telecom-CDR-Analytics-Platform/blob/main/Notebooks/4_CDR_Spark_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark snowflake-connector-python pandas

Collecting snowflake-connector-python
  Downloading snowflake_connector_python-4.3.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (80 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m80.2/80.2 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting cryptography>=44.0.1 (from snowflake-connector-python)
  Downloading cryptography-46.0.5-cp311-abi3-manylinux_2_34_x86_64.whl.metadata (5.7 kB)
Collecting sortedcontainers>=2.4.0 (from snowflake-connector-python)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting boto3>=1.24 (from snowflake-connector-python)
  Downloading boto3-1.42.54-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore>=1.24 (from snowflake-connector-python)
  Downloading botocor

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import snowflake.connector
import pandas as pd

# ‚îÄ‚îÄ Initialize Spark Session
spark = SparkSession.builder \
    .appName('CDR_Analytics_Pipeline') \
    .config('spark.sql.shuffle.partitions', '4') \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')
print(f'‚úÖ Spark Version: {spark.version}')
print(f'‚úÖ Spark Session started successfully')

# ‚îÄ‚îÄ Read data from Snowflake into Pandas then convert to Spark
print('\nüì• Reading CDR data from Snowflake...')
conn = snowflake.connector.connect(
    account  = 'bopsoxz-lr52214',
    user     = 'CHANDANSAHOO',
    password = 'Chandansahoosnowflake5',
    database = 'TELECOM_DWH',
    schema   = 'STAGING',
    warehouse= 'INGEST_WH'
)

cursor = conn.cursor()
cursor.execute('SELECT * FROM TELECOM_DWH.STAGING.STG_CDR')
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
pandas_df = pd.DataFrame(rows, columns=columns)
conn.close()

# ‚îÄ‚îÄ Convert to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
print(f'‚úÖ Loaded {spark_df.count():,} records into Spark DataFrame')
print(f'‚úÖ Partitions: {spark_df.rdd.getNumPartitions()}')
spark_df.printSchema()

‚úÖ Spark Version: 4.0.2
‚úÖ Spark Session started successfully

üì• Reading CDR data from Snowflake...
‚úÖ Loaded 50,000 records into Spark DataFrame
‚úÖ Partitions: 2
root
 |-- CALL_ID: string (nullable = true)
 |-- CALLING_NUMBER: string (nullable = true)
 |-- CALLED_NUMBER: string (nullable = true)
 |-- CALL_START_TIME: timestamp (nullable = true)
 |-- CALL_END_TIME: timestamp (nullable = true)
 |-- DURATION_SECONDS: long (nullable = true)
 |-- CALL_TYPE: string (nullable = true)
 |-- CELL_ID: string (nullable = true)
 |-- TERMINATION_CD: string (nullable = true)
 |-- IS_ROAMING: boolean (nullable = true)
 |-- CHARGE_AMOUNT: decimal(38,18) (nullable = true)
 |-- DATA_VOLUME_MB: decimal(38,18) (nullable = true)
 |-- NETWORK_TYPE: string (nullable = true)
 |-- IS_FRAUD: boolean (nullable = true)



In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

print('üîÑ BATCH PROCESSING: Applying Spark Transformations...')
print('=' * 55)

# ‚îÄ‚îÄ Transformation 1: Add derived columns
df_transformed = spark_df \
    .withColumn('CALL_HOUR',    hour('CALL_START_TIME')) \
    .withColumn('CALL_DATE',    to_date('CALL_START_TIME')) \
    .withColumn('CALL_MONTH',   month('CALL_START_TIME')) \
    .withColumn('CALL_YEAR',    year('CALL_START_TIME')) \
    .withColumn('IS_PEAK_HOUR', when(
        (col('CALL_HOUR').between(8, 11)) |
        (col('CALL_HOUR').between(17, 20)), True
    ).otherwise(False)) \
    .withColumn('CHARGE_AMOUNT', col('CHARGE_AMOUNT').cast('double')) \
    .withColumn('DATA_VOLUME_MB', col('DATA_VOLUME_MB').cast('double'))

print('‚úÖ Transformation 1: Derived columns added (hour, date, month, peak hour)')

# ‚îÄ‚îÄ Transformation 2: Revenue segmentation
df_transformed = df_transformed.withColumn(
    'REVENUE_SEGMENT',
    when(col('CHARGE_AMOUNT') >= 5.0,  'HIGH')
    .when(col('CHARGE_AMOUNT') >= 1.0, 'MEDIUM')
    .otherwise('LOW')
)
print('‚úÖ Transformation 2: Revenue segments assigned (HIGH/MEDIUM/LOW)')

# ‚îÄ‚îÄ Transformation 3: Window function - Running total per subscriber
window_spec = Window.partitionBy('CALLING_NUMBER').orderBy('CALL_START_TIME')
df_transformed = df_transformed \
    .withColumn('RUNNING_REVENUE', sum('CHARGE_AMOUNT').over(window_spec)) \
    .withColumn('CALL_RANK',       rank().over(window_spec))
print('‚úÖ Transformation 3: Running revenue + call rank per subscriber added')

# ‚îÄ‚îÄ Transformation 4: Flag high value calls
df_transformed = df_transformed.withColumn(
    'IS_HIGH_VALUE',
    when(col('CHARGE_AMOUNT') > col('CHARGE_AMOUNT').cast('double'), True)
    .otherwise(False)
)
print('‚úÖ Transformation 4: High value call flags added')

print(f'\nüìä Transformed DataFrame: {df_transformed.count():,} records')
print(f'   Columns: {len(df_transformed.columns)}')
df_transformed.show(5, truncate=False)

üîÑ BATCH PROCESSING: Applying Spark Transformations...
‚úÖ Transformation 1: Derived columns added (hour, date, month, peak hour)
‚úÖ Transformation 2: Revenue segments assigned (HIGH/MEDIUM/LOW)
‚úÖ Transformation 3: Running revenue + call rank per subscriber added
‚úÖ Transformation 4: High value call flags added

üìä Transformed DataFrame: 50,000 records
   Columns: 23
+-----------+--------------+-------------+-------------------+-------------------+----------------+---------+----------+--------------+----------+-------------+--------------+------------+--------+---------+----------+----------+---------+------------+---------------+---------------+---------+-------------+
|CALL_ID    |CALLING_NUMBER|CALLED_NUMBER|CALL_START_TIME    |CALL_END_TIME      |DURATION_SECONDS|CALL_TYPE|CELL_ID   |TERMINATION_CD|IS_ROAMING|CHARGE_AMOUNT|DATA_VOLUME_MB|NETWORK_TYPE|IS_FRAUD|CALL_HOUR|CALL_DATE |CALL_MONTH|CALL_YEAR|IS_PEAK_HOUR|REVENUE_SEGMENT|RUNNING_REVENUE|CALL_RANK|IS_HIGH_VALUE|
+---

In [None]:
from pyspark.sql.functions import count_distinct

print('üìä SPARK ANALYTICS: Running Aggregations...')
print('=' * 55)

# ‚îÄ‚îÄ Analysis 1: Revenue by Call Type
print('\n1Ô∏è‚É£  Revenue by Call Type:')
df_transformed.groupBy('CALL_TYPE') \
    .agg(
        count('*').alias('total_calls'),
        round(sum('CHARGE_AMOUNT'), 2).alias('total_revenue'),
        round(avg('DURATION_SECONDS'), 1).alias('avg_duration')
    ) \
    .orderBy(desc('total_revenue')) \
    .show()

# ‚îÄ‚îÄ Analysis 2: Peak vs Off-Peak Traffic
print('2Ô∏è‚É£  Peak vs Off-Peak Traffic:')
df_transformed.groupBy('IS_PEAK_HOUR') \
    .agg(
        count('*').alias('total_calls'),
        round(sum('CHARGE_AMOUNT'), 2).alias('total_revenue'),
        round(avg('CHARGE_AMOUNT'), 4).alias('avg_charge')
    ) \
    .orderBy(desc('total_calls')) \
    .show()

# ‚îÄ‚îÄ Analysis 3: Network Type Performance
print('3Ô∏è‚É£  Network Type Performance:')
df_transformed.groupBy('NETWORK_TYPE') \
    .agg(
        count('*').alias('total_calls'),
        round(sum('CHARGE_AMOUNT'), 2).alias('total_revenue'),
        count(when(col('TERMINATION_CD') == 'DROPPED', 1)).alias('dropped_calls')
    ) \
    .orderBy(desc('total_calls')) \
    .show()

# ‚îÄ‚îÄ Analysis 4: Monthly Revenue Trend
print('4Ô∏è‚É£  Monthly Revenue Trend:')
df_transformed.groupBy('CALL_MONTH') \
    .agg(
        count('*').alias('total_calls'),
        round(sum('CHARGE_AMOUNT'), 2).alias('total_revenue'),
        count_distinct('CALLING_NUMBER').alias('unique_customers')
    ) \
    .orderBy('CALL_MONTH') \
    .show(12)

# ‚îÄ‚îÄ Analysis 5: Fraud Summary
print('5Ô∏è‚É£  Fraud Summary:')
df_transformed.groupBy('IS_FRAUD') \
    .agg(
        count('*').alias('total_calls'),
        round(sum('CHARGE_AMOUNT'), 2).alias('total_revenue')
    ) \
    .show()

print('‚úÖ All Spark Analytics completed!')

üìä SPARK ANALYTICS: Running Aggregations...

1Ô∏è‚É£  Revenue by Call Type:
+---------+-----------+-------------+------------+
|CALL_TYPE|total_calls|total_revenue|avg_duration|
+---------+-----------+-------------+------------+
|    VOICE|      27405|     41652.23|       179.1|
|    VIDEO|       2533|     12726.12|       294.3|
|     DATA|       7605|       3758.0|       609.6|
|      SMS|      12457|       1269.2|         0.0|
+---------+-----------+-------------+------------+

2Ô∏è‚É£  Peak vs Off-Peak Traffic:
+------------+-----------+-------------+----------+
|IS_PEAK_HOUR|total_calls|total_revenue|avg_charge|
+------------+-----------+-------------+----------+
|        true|      27147|     32468.01|     1.196|
|       false|      22853|     26937.53|    1.1787|
+------------+-----------+-------------+----------+

3Ô∏è‚É£  Network Type Performance:
+------------+-----------+-------------+-------------+
|NETWORK_TYPE|total_calls|total_revenue|dropped_calls|
+------------+------

In [None]:
import builtins
builtins_round = builtins.round

In [None]:
import time
import random
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import col, when, sum as spark_sum, count

print('üåä SPARK STREAMING SIMULATION')
print('=' * 55)
print('Simulating real-time CDR records arriving every second...\n')

# ‚îÄ‚îÄ Define schema
schema = StructType([
    StructField('CALL_ID',          StringType(),  True),
    StructField('CALLING_NUMBER',   StringType(),  True),
    StructField('CALLED_NUMBER',    StringType(),  True),
    StructField('CALL_TYPE',        StringType(),  True),
    StructField('DURATION_SECONDS', IntegerType(), True),
    StructField('CHARGE_AMOUNT',    DoubleType(),  True),
    StructField('NETWORK_TYPE',     StringType(),  True),
    StructField('IS_FRAUD',         BooleanType(), True),
    StructField('TIMESTAMP',        StringType(),  True),
])

PHONES     = ['919199123456', '918877654321', '917766543210',
              '916655432109', '919988776655']
CALL_TYPES = ['VOICE', 'SMS', 'DATA', 'VIDEO']
NETWORKS   = ['2G', '3G', '4G', '5G']

total_processed = 0
total_revenue   = 0.0
fraud_detected  = 0

print(f'{"Batch":<8}{"Records":<10}{"Revenue":<12}{"Fraud":<8}{"Time"}')
print('-' * 55)

for batch_num in range(1, 11):
    batch_records = []
    for i in range(10):
        call_type = random.choice(CALL_TYPES)
        duration  = random.randint(0, 300)
        charge    = builtins_round(random.uniform(0.1, 10.0), 2)
        is_fraud  = random.random() < 0.05

        batch_records.append((
            f'STREAM_{batch_num:03d}_{i:03d}',
            random.choice(PHONES),
            random.choice(PHONES),
            call_type,
            duration,
            charge,
            random.choice(NETWORKS),
            is_fraud,
            datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        ))

    batch_df = spark.createDataFrame(batch_records, schema)

    batch_df = batch_df \
        .withColumn('REVENUE_FLAG',
            when(col('CHARGE_AMOUNT') > 5.0, 'HIGH')
            .when(col('CHARGE_AMOUNT') > 2.0, 'MEDIUM')
            .otherwise('LOW')) \
        .withColumn('FRAUD_ALERT',
            when(col('IS_FRAUD') == True, 'FRAUD DETECTED')
            .otherwise('NORMAL'))

    batch_stats = batch_df.agg(
        count('*').alias('cnt'),
        spark_sum('CHARGE_AMOUNT').alias('rev'),
        spark_sum(when(col('IS_FRAUD') == True, 1).otherwise(0)).alias('fraud')
    ).collect()[0]

    total_processed += batch_stats['cnt']
    total_revenue   += float(batch_stats['rev'])
    fraud_detected  += int(batch_stats['fraud'])

    rev = builtins_round(float(batch_stats['rev']), 2)
    print(f'{batch_num:<8}{batch_stats["cnt"]:<10}‚Çπ{rev:<11}{batch_stats["fraud"]:<8}{datetime.now().strftime("%H:%M:%S")}')

    frauds = batch_df.filter(col('IS_FRAUD') == True).select(
        'CALL_ID', 'CALLING_NUMBER', 'CHARGE_AMOUNT'
    ).collect()
    for f in frauds:
        print(f'   ‚ö†Ô∏è  ALERT: {f["CALL_ID"]} | {f["CALLING_NUMBER"]} | ‚Çπ{f["CHARGE_AMOUNT"]}')

    time.sleep(1)

print('-' * 55)
print(f'\nüìä STREAMING SUMMARY:')
print(f'   Total Records Processed: {total_processed}')
print(f'   Total Revenue:           ‚Çπ{builtins_round(total_revenue, 2)}')
print(f'   Fraud Alerts:            {fraud_detected}')
print(f'\n‚úÖ Spark Streaming Simulation Complete!')

üåä SPARK STREAMING SIMULATION
Simulating real-time CDR records arriving every second...

Batch   Records   Revenue     Fraud   Time
-------------------------------------------------------
1       10        ‚Çπ64.78      2       12:13:04
   ‚ö†Ô∏è  ALERT: STREAM_001_001 | 919199123456 | ‚Çπ8.42
   ‚ö†Ô∏è  ALERT: STREAM_001_009 | 919988776655 | ‚Çπ8.59
2       10        ‚Çπ50.14      1       12:13:07
   ‚ö†Ô∏è  ALERT: STREAM_002_001 | 918877654321 | ‚Çπ5.29
3       10        ‚Çπ55.13      0       12:13:09
4       10        ‚Çπ51.96      1       12:13:11
   ‚ö†Ô∏è  ALERT: STREAM_004_008 | 916655432109 | ‚Çπ4.48
5       10        ‚Çπ55.82      2       12:13:13
   ‚ö†Ô∏è  ALERT: STREAM_005_003 | 919988776655 | ‚Çπ2.49
   ‚ö†Ô∏è  ALERT: STREAM_005_009 | 919988776655 | ‚Çπ9.15
6       10        ‚Çπ45.7       1       12:13:15
   ‚ö†Ô∏è  ALERT: STREAM_006_001 | 918877654321 | ‚Çπ5.52
7       10        ‚Çπ50.32      0       12:13:17
8       10        ‚Çπ53.36      1       12:13:20
   ‚ö†Ô∏è  A