In [None]:
import polars as pl

# Sample telecom dataset - Call Detail Records (CDRs)
cdr_data = pl.DataFrame({
    "call_id": [101, 102, 103],
    "caller": ["+919876543210", "+918765432109", "+917654321098"],
    "receiver": ["+911234567890", "+919898989898", "+918282828282"],
    "duration": [300, 120, 450],  # Duration in seconds
    "call_type": ["Local", "International", "Roaming"]
})

# Initialize SQLContext
sql_context = pl.SQLContext()

# Register CDR dataset as a SQL table
sql_context.register("cdr_logs", cdr_data)

# Run SQL query to get long-duration calls
query = """
    SELECT caller, receiver, duration
    FROM cdr_logs
    WHERE duration > 200
"""
result = sql_context.execute(query).collect()
print(result)


In [None]:
import polars as pl

# Sample Call Detail Records (CDRs) dataset
cdr_data = pl.DataFrame({
    "call_id": [101, 102, 103, 104],
    "caller": ["+919876543210", "+918765432109", "+917654321098", "+919191919191"],
    "duration": [300, 450, 120, 600],  # Duration in seconds
    "call_type": ["Local", "International", "Local", "Roaming"]
})

# Using polars.sql_expr() to filter high-duration calls
high_duration_calls = cdr_data.filter(pl.sql_expr("duration > 300"))

print(high_duration_calls)


In [None]:
import polars as pl

# Sample telecom call records dataset
cdr_data = pl.DataFrame({
    "customer_id": [1, 2, 1, 3, 2, 3, 1],
    "phone_number": ["+919876543210", "+918765432109", "+919876543210",
                     "+917654321098", "+918765432109", "+917654321098", "+919876543210"],
    "duration": [300, 450, 120, 600, 150, 200, 400],  # Duration in seconds
    "call_type": ["Local", "International", "Local", "Roaming", "Local", "International", "Roaming"]
})

# Aggregation: Calculate total and average call duration per customer
aggregated_data = cdr_data.group_by("customer_id").agg([
    pl.sql_expr("SUM(duration)").alias("total_call_duration"),
    pl.sql_expr("AVG(duration)").alias("avg_call_duration")
])

# Transformation: Categorizing customers based on total call duration
aggregated_data = aggregated_data.with_columns(
    pl.sql_expr("""
        CASE 
            WHEN total_call_duration > 1000 THEN 'Heavy User'
            WHEN total_call_duration BETWEEN 500 AND 1000 THEN 'Moderate User'
            ELSE 'Light User'
        END
    """).alias("user_category")
)

print(aggregated_data)


In [None]:
import polars as pl
import pandas as pd
import pyarrow as pa

# Telecom Call Detail Records (CDRs) - Polars DataFrame
cdr_data = pl.DataFrame({
    "customer_id": [1, 2, 3, 4],
    "call_duration": [300, 450, 120, 600],  # in seconds
    "call_type": ["Local", "International", "Local", "Roaming"]
})

# Network Performance Data - Pandas DataFrame
network_data = pd.DataFrame({
    "customer_id": [1, 2, 3, 4],
    "dropped_calls": [2, 10, 5, 1],
    "network_quality": ["Good", "Poor", "Average", "Excellent"]
})

# Billing Information - PyArrow Table
billing_data = pa.table({
    "customer_id": [1, 2, 3, 4],
    "total_bill": [50.0, 120.5, 30.0, 200.0],
    "plan": ["Basic", "Premium", "Basic", "Ultra"]
})

# Running a SQL query across Polars, Pandas, and PyArrow data
query = """
    SELECT 
        c.customer_id, 
        c.call_duration, 
        n.network_quality, 
        b.total_bill, 
        b.plan,
        CASE 
            WHEN b.total_bill > 100 THEN 'High Value'
            WHEN b.total_bill BETWEEN 50 AND 100 THEN 'Medium Value'
            ELSE 'Low Value'
        END AS customer_segment
    FROM cdr_data c
    JOIN network_data n ON c.customer_id = n.customer_id
    JOIN billing_data b ON c.customer_id = b.customer_id
    WHERE n.network_quality != 'Poor'
    ORDER BY b.total_bill DESC
"""

# Execute SQL query using pl.sql() and register data sources
result = pl.sql(
    query
).collect()

print(result)


In [None]:
import sqlite3
import random
import datetime
import polars as pl
from sqlalchemy import create_engine

# Step 1: Create SQLite Database and Table
db_name = "telecom_data.db"
conn = sqlite3.connect(db_name)
cursor = conn.cursor()

cursor.execute("""
    CREATE TABLE IF NOT EXISTS call_logs (
        call_id INTEGER PRIMARY KEY AUTOINCREMENT,
        customer_id INTEGER,
        call_duration INTEGER,  -- in seconds
        call_type TEXT,  -- 'incoming' or 'outgoing'
        timestamp TEXT
    )
""")
conn.commit()

# Step 2: Generate Random Telecom Data
def generate_random_calls(n=5):
    call_types = ["incoming", "outgoing"]
    data = {
        "customer_id": [random.randint(1000, 9999) for _ in range(n)],
        "call_duration": [random.randint(30, 900) for _ in range(n)],  # Duration between 30 sec to 15 min
        "call_type": [random.choice(call_types) for _ in range(n)],
        "timestamp": [datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") for _ in range(n)]
    }
    return pl.DataFrame(data)

# Step 3: Insert Initial Data Using Polars write_database()
engine = create_engine(f"sqlite:///{db_name}")

df_initial = generate_random_calls(5)
df_initial.write_database(table_name="call_logs", connection=engine.connect(),if_table_exists= "append")

# Step 4: Read Data Using Polars
query = "SELECT * FROM call_logs"
df_loaded = pl.read_database(query=query, connection=engine.connect())

print("Initial Data Loaded into Polars DataFrame:")
print(df_loaded)

# Step 5: Insert More Records Using write_database()
df_new = generate_random_calls(3)
df_new.write_database(table_name="call_logs", connection=engine.connect(), if_table_exists
="append")

# Reload the data into Polars
df_updated = pl.read_database(query=query, connection=engine.connect())

print("\nUpdated Data Loaded into Polars DataFrame:")
print(df_updated)

# Close the database connection
conn.close()
