# **Course Outline**

## **Phase 1: Data Handling & Storage**
<hr></hr>

### **Working with Data Formats**
- CSV, JSON, Parquet, Avro
- Handling large files efficiently
- Data serialization and deserialization

### **Relational Databases & SQL**
- SQL basics (CRUD operations, joins, indexing)
- Advanced SQL (window functions, CTEs, query optimization)
- PostgreSQL and Oracle ADW hands-on practice

### **NoSQL Databases**
- Introduction to NoSQL (when to use it)**
- MongoDB (document-based storage)
- Redis (key-value store)
- Cassandra (columnar storage)


## **Phase 2: Data Ingestion & Processing**
<hr></hr>

### **Batch Data Processing**
- Pandas and Polars for data manipulation
- PySpark basics (RDDs, DataFrames, Transformations & Actions)
- Writing ETL pipelines with PySpark

### **Real-time Data Processing**
- Introduction to streaming vs batch processing
- Kafka for real-time messaging
- Apache Flink and Spark Streaming basics

### **Web Scraping & APIs**
- Scraping with BeautifulSoup & Selenium
- REST & GraphQL APIs
- Handling rate limits, authentication, and pagination

## **Phase 3: Workflow Orchestration & Automation**
<hr></hr>

### **Airflow for Workflow Orchestration**
- DAGs, Operators, Tasks
- Scheduling & monitoring workflows
- Integrating Airflow with databases and cloud storage

### **Data Pipelines & CI/CD**
- Building modular data pipelines
- Testing data pipelines
- CI/CD for data engineering (GitHub Actions, Azure Pipelines)

## **Phase 4: Cloud & Infrastructure**
<hr></hr>

### **Cloud Data Engineering**

- Storing data in cloud storage (Azure Blob, AWS S3, GCP Storage)
- Managed data services (BigQuery, Snowflake, Redshift)
- Hands-on with Oracle ADW in OCI

### **Infrastructure as Code (IaC)**
- Terraform basics for cloud provisioning
- Automating deployments in Azure/GCP/AWS

## **Phase 5: Data Governance & Analytics**
<hr></hr>

### **Data Governance & Security**
- Data privacy laws (GDPR, CCPA)
- Role-based access control (RBAC)
- Data lineage & cataloging tools (Apache Atlas, DataHub)

### **Data Warehousing & BI**
- Data modeling (star schema, snowflake schema)
- Building dashboards with Plotly & Power BI
- Connecting data warehouses to BI tools

## **Final Project**
<hr></hr>

**Building a complete data pipeline**

- Extracting stock and financial data (WSJ & PSE)
- Storing in Oracle ADW
- Processing with PySpark
- Visualizing trends using Plotly or Power BI
- Automating workflows with Airflow

# **Phase 1: Data Handling & Storage**
<hr></hr>

## **Working with Data Formats**
<hr></hr>

### **Overview of Data Formats**

<div style="float: left;">

| Format | Structure | Pros | Cons |
|------|---------------|--------------|--------------|
| **CSV** | Row-based | Easy to read, widely supported | Large file sizes, inefficient for large data |
| **JSON** | Semi-structured (key-value) | Flexible, widely used in APIs | Larger size, needs parsing |
| **Parquet** | Columnar | Efficient storage, great for analytics | Not human-readable |
| **Avro** | Binary format | Schema evolution, fast | Requires special tools to read |
</div>

### **Handling CSV Files Efficiently**

In [None]:
!pip install pandas

In [16]:
import pandas as pd

df = pd.read_csv("../data/MEG.csv")
print(df)

            Date   Open   High    Low   Close        Volume
0       10/27/23  1.960  1.970  1.940   1.960  6.800000e+06
1       10/26/23  1.970  1.990  1.950   1.950  1.296500e+07
2       10/25/23  2.000  2.030  2.000   2.010  6.178000e+06
3       10/24/23  2.000  2.020  1.990   2.000  7.524000e+06
4       10/23/23  2.010  2.020  2.000   2.000  8.299000e+06
...          ...    ...    ...    ...     ...           ...
6923  01/09/1995  2.314  2.314  2.288   2.314  1.387800e+07
6924  01/06/1995  2.314  2.341  2.314   2.314  2.407262e+07
6925  01/05/1995  2.288  2.314  2.288   2.288  3.185465e+08
6926  01/04/1995  2.314  2.314  2.288   2.314  2.231886e+07
6927  01/03/1995  2.341  2.367  2.314   2.341  3.149516e+08

[6928 rows x 6 columns]


In [18]:
# For large files

In [21]:
df = pd.read_csv("../data/MEG.csv", chunksize=1000)  # Process in chunks
for chunk in df:
    print(chunk.head())  # Process each chunk separately

       Date   Open   High   Low   Close    Volume
0  10/27/23   1.96   1.97  1.94    1.96   6800000
1  10/26/23   1.97   1.99  1.95    1.95  12965000
2  10/25/23   2.00   2.03  2.00    2.01   6178000
3  10/24/23   2.00   2.02  1.99    2.00   7524000
4  10/23/23   2.01   2.02  2.00    2.00   8299000
          Date   Open   High   Low   Close    Volume
1000  09/27/19   4.79   4.81  4.46    4.46  92220000
1001  09/26/19   4.74   4.87  4.70    4.79  17581000
1002  09/25/19   4.97   4.98  4.70    4.72  26913000
1003  09/24/19   4.85   4.98  4.84    4.97  16350000
1004  09/23/19   4.95   5.00  4.80    4.81  17000000
          Date   Open   High   Low   Close    Volume
2000  08/19/15   4.30   4.38  4.20    4.35  56249000
2001  08/18/15   4.45   4.47  4.27    4.28  30480000
2002  08/17/15   4.62   4.66  4.39    4.49  38611000
2003  08/14/15   4.56   4.61  4.56    4.60  30808000
2004  08/13/15   4.55   4.62  4.50    4.55  31808000
            Date   Open   High   Low   Close    Volume
3000  07/

**`chunksize`**

- When working with large CSV files, loading everything into memory at once may cause high RAM usage and slow performance.
- `chunksize` allows reading the file in smaller parts, reducing memory consumption.
- Instead of returning a full DataFrame, `pd.read_csv()` returns an **iterator** (a chunk at a time).

**Common Use Cases for `chunksize`**

In [26]:
#Counting Rows in a Large File Efficiently
total_rows = 0
for chunk in pd.read_csv("../data/MEG.csv", chunksize=10000):
    total_rows += len(chunk)

print(f"Total Rows: {total_rows}")


Total Rows: 6928


In [38]:
# Filtering Data While Reading (Memory Efficient)

filtered_data = []

for chunk in pd.read_csv("../data/MEG.csv", chunksize=5000):
    filtered_chunk = chunk[chunk[" Volume"] > 10000000]
    filtered_data.append(filtered_chunk)

df_filtered = pd.concat(filtered_data)  # Merge all chunks together
df_filtered.shape

(4634, 6)

### **Working with JSON Data**

- `json.load()` loads JSON into a Python list/dict structure
- This format is flexible but can be inefficient for large-scale storage

In [7]:
import json

with open("../data/out.json") as f:
    data = json.load(f)

print(data)  # Prints a list of dictionaries


{'documentMetadata': {'pageCount': 1, 'mimeType': 'image/png'}, 'pages': [{'pageNumber': 1, 'dimensions': {'width': 940.0, 'height': 529.0, 'unit': 'PIXEL'}, 'detectedDocumentTypes': None, 'detectedLanguages': None, 'words': [{'text': 'May', 'confidence': 0.9234257, 'boundingPolygon': {'normalizedVertices': [{'x': 0.00851063829787234, 'y': 0.011342155009451797}, {'x': 0.0425531914893617, 'y': 0.011342155009451797}, {'x': 0.0425531914893617, 'y': 0.03780718336483932}, {'x': 0.00851063829787234, 'y': 0.03780718336483932}]}}, {'text': '3', 'confidence': 0.9234257, 'boundingPolygon': {'normalizedVertices': [{'x': 0.045744680851063826, 'y': 0.011342155009451797}, {'x': 0.05638297872340425, 'y': 0.011342155009451797}, {'x': 0.05638297872340425, 'y': 0.03780718336483932}, {'x': 0.045744680851063826, 'y': 0.03780718336483932}]}}, {'text': 'May', 'confidence': 0.9384263, 'boundingPolygon': {'normalizedVertices': [{'x': 0.18829787234042553, 'y': 0.011342155009451797}, {'x': 0.22127659574468084, 

In [51]:
import pandas as pd

# Load JSON file
file_path = "../data/expenses.json"
data = pd.read_json(file_path)
data

Unnamed: 0,id,date,category,amount,currency,description,payment_method
0,1,2025-03-04,Food,12.5,USD,Lunch at a restaurant,Credit Card
1,2,2025-03-03,Transport,5.0,USD,Bus fare,Cash
2,3,2025-03-02,Groceries,45.3,USD,Weekly grocery shopping,Debit Card
3,4,2025-03-01,Entertainment,15.99,USD,Movie ticket,Credit Card
4,5,2025-02-28,Utilities,80.75,USD,Electricity bill,Bank Transfer


In [60]:
#If JSON is complex
import json
import pandas as pd

# Load JSON from file
with open("../data/out.json", "r") as file:
    data = json.load(file)

# Extract words and confidence per page
rows = []
for page_idx, page in enumerate(data.get("pages", []), start=1):
    for word in page.get("words", []):
        rows.append({"page": page_idx, "text": word["text"], "confidence": float(word["confidence"])})

# Convert to DataFrame
df = pd.DataFrame(rows)
df

Unnamed: 0,page,text,confidence
0,1,May,0.923426
1,1,3,0.923426
2,1,May,0.938426
3,1,3,0.938426
4,1,Olympic,0.955067
...,...,...,...
227,1,22,0.999598
228,1,Lazada,0.986753
229,1,Ph,0.986753
230,1,Makati,0.999196


In [66]:
import json
import pandas as pd

# Load JSON from file
with open("../data/out.json", "r") as file:
    data = json.load(file)

# Extract words, confidence, and normalizedVertices
rows = []
for page_idx, page in enumerate(data.get("pages", []), start=1):
    for word in page.get("words", []):
        text = word.get("text", "")
        confidence = float(word.get("confidence", 0))
        
        # Extract bounding polygon coordinates
        bounding_polygon = word.get("boundingPolygon", {}).get("normalizedVertices", [])
        
        for vertex in bounding_polygon:
            rows.append({
                "page": page_idx,
                "text": text,
                "confidence": confidence,
                "x": vertex.get("x", None),
                "y": vertex.get("y", None)
            })

# Convert to DataFrame
df = pd.DataFrame(rows)
df


Unnamed: 0,page,text,confidence,x,y
0,1,May,0.923426,0.008511,0.011342
1,1,May,0.923426,0.042553,0.011342
2,1,May,0.923426,0.042553,0.037807
3,1,May,0.923426,0.008511,0.037807
4,1,3,0.923426,0.045745,0.011342
...,...,...,...,...,...
923,1,Makati,0.999196,0.510638,0.947070
924,1,1130.34,0.997102,0.891489,0.926276
925,1,1130.34,0.997102,0.960638,0.926276
926,1,1130.34,0.997102,0.960638,0.948960


In [77]:
import requests

url = "https://pseops.azurewebsites.net/api/GetDividendInformation?code=1h/6bs7u4tzxVbSWEpCmqjDMda8tgcD7Pt7tjiT6WOX/YjNMpIbBsQ==&ticker=LTG"

# Send GET request to the API
response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Parse the JSON response
    data = response.json()

    df = pd.DataFrame(data)
else:
    print(f"Failed to retrieve data. Status code: {response.status_code}")

df

Unnamed: 0,ID,Ticker,CompanyName,TypeofSecurity,TypeofDividend,DividendRate,ExDividendDate,RecordDate,PaymentDate,CircularNumber
0,1959,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.15,"Mar 08, 2024","Mar 11, 2024","Mar 22, 2024",C00942-2024
1,1958,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.15,"Mar 08, 2024","Mar 11, 2024","Mar 22, 2024",C00943-2024
2,877,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.15,"Mar 01, 2023","Mar 6, 2023","Mar 17, 2023",C01205-2023
3,876,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.15,"Mar 01, 2023","Mar 6, 2023","Mar 17, 2023",C01206-2023
4,227,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.15,"Mar 25, 2022","Mar 30, 2022","Apr 12, 2022",C01718-2022
5,226,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.15,"Mar 25, 2022","Mar 30, 2022","Apr 12, 2022",C01719-2022
6,2572,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.30,"May 31, 2024","Jun 3, 2024","Jun 14, 2024",C03313-2024
7,427,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.30,"May 26, 2022","May 31, 2022","Jun 15, 2022",C03654-2022
8,1105,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.30,"May 25, 2023","May 30, 2023","Jun 13, 2023",C03866-2023
9,225,LTG,"LT Group, Inc.",COMMON,Cash,Php 0.24,"Jun 22, 2021","Jun 25, 2021","Jul 9, 2021",C04063-2021


### **Working with `parquet`**

In [3]:
import pandas as pd

# Sample data
data = {
    "id": [1, 2, 3, 4, 5],
    "name": ["Alice", "Bob", "Charlie", "David", "Eve"],
    "age": [25, 30, 35, 40, 45],
    "salary": [50000, 60000, 70000, 80000, 90000]
}

# Convert to DataFrame
df = pd.DataFrame(data)

# Save as Parquet
df.to_parquet("sample.parquet", engine="pyarrow", index=False)
print("Parquet file 'sample.parquet' created successfully!")


Parquet file 'sample.parquet' created successfully!


In [5]:
# Read the Parquet file
df_parquet = pd.read_parquet("sample.parquet")

# Display Data
print("Parquet File Data:")
print(df_parquet)


Parquet File Data:
   id     name  age  salary
0   1    Alice   25   50000
1   2      Bob   30   60000
2   3  Charlie   35   70000
3   4    David   40   80000
4   5      Eve   45   90000


### **Working with `avro`**

Avro allows you to evolve your schema over time without breaking existing data. This is useful in data engineering pipelines where requirements change over time.

**How Avro Supports Schema Evolution**

Schema is stored with the data - `Avro` embeds the schema inside the file, so consumers can read the data correctly even if the schema changes.
Backward and forward compatibility - `Avro` supports adding, removing, or modifying fields in a controlled way.

In [15]:
!pip install fastavro

Collecting fastavro
  Downloading fastavro-1.10.0-cp312-cp312-win_amd64.whl.metadata (5.7 kB)
Downloading fastavro-1.10.0-cp312-cp312-win_amd64.whl (487 kB)
Installing collected packages: fastavro
Successfully installed fastavro-1.10.0




In [17]:
import fastavro

schema = {
    "type": "record",
    "name": "Employee",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "salary", "type": "float"}
    ]
}

data = [{"id": 1, "name": "John", "salary": 55000.0}]

with open("data.avro", "wb") as f:
    fastavro.writer(f, schema, data)


In [19]:
with open("data.avro", "rb") as f:
    reader = fastavro.reader(f)
    for record in reader:
        print(record)


{'id': 1, 'name': 'John', 'salary': 55000.0}


| Feature           | Avro  | Parquet | JSON | CSV  |
|------------------|-------|---------|------|------|
| Schema Evolution | Yes | Limited | No | No |
| Storage Efficiency | Compact | Highly Compressed | Large | Large |
| Read Performance | Medium | Best for queries | Slow | Slow |
| Write Performance | Fast | Slower | Medium | Fast |
| Best Use Case | Streaming, Kafka, log storage | Analytical queries (Big Data) | Web APIs, readable storage | Simple tabular data |


### **Serialization & Deserialization**
<hr></hr>

- Serialization means converting data into a format that can be saved and later reconstructed
- Python supports multiple serialization formats like `Pickle` and `MessagePack`

#### **Using Pickle (Python's Native Format)**

Pickle is useful for Python objects but not cross-language compatible

In [23]:
import pickle

data = {"name": "John", "age": 30}

with open("data.pkl", "wb") as f:
    pickle.dump(data, f)

with open("data.pkl", "rb") as f:
    loaded_data = pickle.load(f)

print(loaded_data)  # {'name': 'John', 'age': 30}


{'name': 'John', 'age': 30}


#### **Using MessagePack (Faster & Cross-Language)**

In [28]:
import msgpack

data = {"name": "John", "age": 30}

with open("data.msgpack", "wb") as f:
    f.write(msgpack.packb(data))

with open("data.msgpack", "rb") as f:
    loaded_data = msgpack.unpackb(f.read())

print(loaded_data)  # {'name': 'John', 'age': 30}


{'name': 'John', 'age': 30}



## Importance of Databases in Data Engineering

Databases are a critical part of Data Engineering workflows. Whether dealing with structured or semi-structured data,
we need efficient ways to store, query, and process information. 

- **Relational Databases** are best suited for structured data with strict relationships and ACID compliance.
- **NoSQL Databases** are useful when dealing with flexible schemas, large-scale distributed systems, and high-speed operations.

We will explore the practical use cases for each in this lesson.



## Relational Databases: Azure SQL DB & OCI ADW

Relational databases use **structured query language (SQL)** and are great for transactional applications, analytics,
and structured data. 

### Why Use Relational Databases?
- **ACID Compliance**: Ensures data consistency and reliability.
- **Structured Schema**: Data integrity is maintained.
- **SQL-Based Queries**: Powerful and optimized querying.

### When to Use Relational Databases?
- When dealing with **structured data** with predefined relationships.
- When **transactional consistency** is required.
- When **OLAP (Online Analytical Processing)** is needed.

---


In [1]:
!pip install pyodbc



In [3]:
import pyodbc

# Azure SQL Database connection details
server = ""  # Replace with your server name
database = ""  # Replace with your database name
username = ""  # Replace with your username
password = ""  # Replace with your password
driver = "{ODBC Driver 17 for SQL Server}"  # Ensure this driver is installed

# Create connection string
conn_str = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"

try:
    # Establish connection
    conn = pyodbc.connect(conn_str)
    cursor = conn.cursor()
    
    # Test query
    cursor.execute("SELECT @@VERSION")
    row = cursor.fetchone()
    
    print("Connected to Azure SQL Database!")
    print("SQL Server Version:", row[0])

except Exception as e:
    print("Error:", e)

finally:
    if 'conn' in locals():
        conn.close()
        print("Connection closed.")


Connected to Azure SQL Database!
SQL Server Version: Microsoft SQL Azure (RTM) - 12.0.2000.8 
	Feb  9 2025 20:57:20 
	Copyright (C) 2024 Microsoft Corporation

Connection closed.


In [17]:
import requests
import pandas as pd
from sqlalchemy import create_engine

# API URL
url = "https://pseops.azurewebsites.net/api/GetDividendInformation?code=1h/6bs7u4tzxVbSWEpCmqjDMda8tgcD7Pt7tjiT6WOX/YjNMpIbBsQ==&ticker=LTG"

# Send GET request to the API
response = requests.get(url)

# Check if request was successful
if response.status_code == 200:
    data = response.json()
    df = pd.DataFrame(data)
else:
    print(f"Failed to retrieve data. Status code: {response.status_code}")
    exit()  # Stop execution if data fetch fails


# Azure SQL Database connection details
server = ""  # Replace with your server name
database = ""  # Replace with your database name
username = ""  # Replace with your username
password = ""  # Replace with your password
driver = "{ODBC Driver 17 for SQL Server}"  # Ensure this driver is installed

# Establish connection
conn_str = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

# Define the table name
table_name = "DividendInformation"

# Ensure the table exists (modify schema as needed)
cursor.execute(f"""
DROP TABLE IF EXISTS dengr.{table_name};

CREATE TABLE dengr.{table_name} (
        ID INT,
        Ticker NVARCHAR(100),
        CompanyName NVARCHAR(100),
        TypeofSecurity NVARCHAR(100),
        TypeofDividend NVARCHAR(100),
        DividendRate NVARCHAR(100),
        ExDividendDate NVARCHAR(100),
        RecordDate NVARCHAR(100),
        PaymentDate NVARCHAR(100),
        CircularNumber NVARCHAR(100)
)
""")
conn.commit()

# Insert data into the table
insert_query = f"""
INSERT INTO dengr.{table_name} (ID,Ticker,CompanyName,TypeofSecurity,TypeofDividend,DividendRate,ExDividendDate,RecordDate,PaymentDate,CircularNumber)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""

# Convert DataFrame to list of tuples for batch insertion
records = df[[
    "ID",
    "Ticker",
    "CompanyName",
    "TypeofSecurity",
    "TypeofDividend",
    "DividendRate",
    "ExDividendDate",
    "RecordDate",
    "PaymentDate",
    "CircularNumber"
]].values.tolist()

# Execute batch insert
cursor.executemany(insert_query, records)
conn.commit()

print(f"Data successfully inserted into Azure SQL table: {table_name}")

# Close connection
cursor.close()
conn.close()

Data successfully inserted into Azure SQL table: DividendInformation


In [21]:
!pip install oracledb

Collecting oracledb
  Downloading oracledb-3.0.0-cp312-cp312-win_amd64.whl.metadata (5.6 kB)
Downloading oracledb-3.0.0-cp312-cp312-win_amd64.whl (2.1 MB)
   ---------------------------------------- 0.0/2.1 MB ? eta -:--:--
   ---------------------------------------- 2.1/2.1 MB 23.0 MB/s eta 0:00:00
Installing collected packages: oracledb
Successfully installed oracledb-3.0.0


In [23]:
import oracledb

# Oracle ADW connection configuration
try:
    connection = oracledb.connect(
        config_dir="",
        user="",
        password="",
        dsn="",
        wallet_location="",
        wallet_password=""
    )
    
    cursor = connection.cursor()
    
    # Run a simple test query
    cursor.execute("SELECT 'Connected to Oracle ADW' FROM dual")
    result = cursor.fetchone()
    
    print(result[0])  # Should print "Connected to Oracle ADW"
    
    # Close connection
    cursor.close()
    connection.close()

except oracledb.DatabaseError as e:
    print("Error while connecting to Oracle ADW:", e)


Connected to Oracle ADW


In [29]:
import requests
import pandas as pd
import oracledb

# API URL
url = "https://pseops.azurewebsites.net/api/GetDividendInformation?code=1h/6bs7u4tzxVbSWEpCmqjDMda8tgcD7Pt7tjiT6WOX/YjNMpIbBsQ==&ticker=LTG"

# Send GET request to the API
response = requests.get(url)

# Check if request was successful
if response.status_code == 200:
    data = response.json()
    df = pd.DataFrame(data)
else:
    print(f"Failed to retrieve data. Status code: {response.status_code}")
    exit()  # Stop execution if data fetch fails

# Oracle ADW connection configuration
connection = oracledb.connect(
    config_dir="",
    user="",
    password="",
    dsn="dataengineeringdb_high",
    wallet_location="../adw/Wallet_dataengineeringDB",
    wallet_password=""
)

cursor = connection.cursor()

# Define table name
table_name = "DIVIDEND_INFORMATION"

# Ensure the table exists (modify schema as needed)
cursor.execute(f"""
DECLARE
    table_count NUMBER;
BEGIN
    SELECT COUNT(*) INTO table_count FROM user_tables WHERE table_name = UPPER('{table_name}');
    IF table_count = 0 THEN
        EXECUTE IMMEDIATE '
            CREATE TABLE {table_name} (
                ID NUMBER,
                TICKER VARCHAR2(100),
                COMPANYNAME VARCHAR2(100),
                TYPEOFSECURITY VARCHAR2(100),
                TYPEOFDIVIDEND VARCHAR2(100),
                DIVIDENDRATE VARCHAR2(100),
                EXDIVIDENDDATE VARCHAR2(100),
                RECORDDATE VARCHAR2(100),
                PAYMENTDATE VARCHAR2(100),
                CIRCULARNUMBER VARCHAR2(100)


            )';
    END IF;
END;
""")

# Prepare SQL insert statement
insert_query = f"""
INSERT INTO {table_name} (ID,TICKER,COMPANYNAME,TYPEOFSECURITY,TYPEOFDIVIDEND,DIVIDENDRATE,EXDIVIDENDDATE,RECORDDATE,PAYMENTDATE,CIRCULARNUMBER
)
VALUES (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10)
"""

# Convert DataFrame to list of tuples
records = df[[
    "ID",
    "Ticker",
    "CompanyName",
    "TypeofSecurity",
    "TypeofDividend",
    "DividendRate",
    "ExDividendDate",
    "RecordDate",
    "PaymentDate",
    "CircularNumber"
]].values.tolist()

# Execute batch insert
cursor.executemany(insert_query, records)
connection.commit()

print(f"Data successfully inserted into Oracle ADW table: {table_name}")

# Close connection
cursor.close()
connection.close()


Data successfully inserted into Oracle ADW table: DIVIDEND_INFORMATION



## NoSQL Databases: Azure Cosmos DB & OCI NoSQL

NoSQL databases are designed for **high-speed operations, scalability, and flexible schemas**.
They are useful in applications where relational databases struggle.

### Why Use NoSQL Databases?
- **Schema flexibility**: Allows different data structures in the same collection.
- **Scalability**: Designed to handle large amounts of data.
- **Performance**: Optimized for high-speed read and write operations.

### When to Use NoSQL Databases?
- When working with **semi-structured or unstructured data**.
- When **low-latency, high-speed queries** are required.
- When dealing with **large-scale applications**.

---


In [33]:
!pip install borneo oci

Collecting borneo
  Downloading borneo-5.4.2-py3-none-any.whl.metadata (16 kB)
Collecting oci
  Downloading oci-2.149.0-py3-none-any.whl.metadata (5.3 kB)
Collecting circuitbreaker<3.0.0,>=1.3.1 (from oci)
  Downloading circuitbreaker-2.1.0-py2.py3-none-any.whl.metadata (7.8 kB)
Downloading borneo-5.4.2-py3-none-any.whl (175 kB)
Downloading oci-2.149.0-py3-none-any.whl (29.4 MB)
   ---------------------------------------- 0.0/29.4 MB ? eta -:--:--
   -- ------------------------------------- 2.1/29.4 MB 11.8 MB/s eta 0:00:03
   ---- ----------------------------------- 3.1/29.4 MB 9.2 MB/s eta 0:00:03
   ----- ---------------------------------- 3.9/29.4 MB 6.3 MB/s eta 0:00:05
   ----- ---------------------------------- 4.2/29.4 MB 6.5 MB/s eta 0:00:04
   -------- ------------------------------- 6.0/29.4 MB 5.8 MB/s eta 0:00:05
   --------- ------------------------------ 7.3/29.4 MB 6.0 MB/s eta 0:00:04
   ----------- ---------------------------- 8.4/29.4 MB 6.3 MB/s eta 0:00:04
   -----

In [65]:
import os
from borneo import (Regions, NoSQLHandle, NoSQLHandleConfig, PutRequest,
                    TableRequest, GetRequest, TableLimits, State)
from borneo.iam import SignatureProvider

# Given a region, and compartment, instantiate a connection to the
# cloud service and return it
def get_connection(region):
    print("Connecting to the Oracle NoSQL Cloud Service")
    provider = SignatureProvider(config_file="~/.oci/demouser",profile_name="DEMOUSER");
    #provider = SignatureProvider();
    config = NoSQLHandleConfig(region, provider)
    config.set_default_compartment("ocid1.compartment.oc1..aaaaaaaa6x4ezmuleaubvwgsx3jdolywyalb6iyqbw4ucmimzs7rmfiwhktq")
    return(NoSQLHandle(config))

# Given a handle to the Oracle NoSQL Database cloud service, the name of the table
# to write the record to, and an instance of a dictionary, formatted as a
# record for the table, this function will write the record to the table
def write_a_record(handle, table_name, record):
    request = PutRequest().set_table_name(table_name)
    request.set_value(record)
    handle.put(request)

# Given a handle to the Oracle NoSQL Database cloud service, the name of the table
# to read from, and the primary key value for the table, this function will
# read the record from the table and return it
def read_a_record(handle, table_name, pk):
    request = GetRequest().set_table_name(table_name)
    request.set_key({'ID' : pk})
    return(handle.get(request))



In [69]:
handle = get_connection("us-phoenix-1")

record = {
    'ID':1969,
    'TICKER':'LTG',
    'COMPANYNAME':'LT Group, Inc.',
    'TYPEOFSECURITY':'COMMON',
    'TYPEOFDIVIDEND':'Cash',
    'DIVIDENDRATE':'Php 0.15',
    'EXDIVIDENDDATE':'Mar 08, 2024',
    'RECORDDATE':'Mar 11, 2024',
    'PAYMENTDATE':'Mar 22, 2024',
    'CIRCULARNUMBER':'C00942-2024'
}
write_a_record(handle, 'DIVIDEND_HISTORY', record)
print('Wrote record: \n\t'  + str(record))

the_written_record = read_a_record(handle, 'DIVIDEND_HISTORY', 1969)
print('Read record: \n\t' + str(record))

Connecting to the Oracle NoSQL Cloud Service
Wrote record: 
	{'ID': 1969, 'TICKER': 'LTG', 'COMPANYNAME': 'LT Group, Inc.', 'TYPEOFSECURITY': 'COMMON', 'TYPEOFDIVIDEND': 'Cash', 'DIVIDENDRATE': 'Php 0.15', 'EXDIVIDENDDATE': 'Mar 08, 2024', 'RECORDDATE': 'Mar 11, 2024', 'PAYMENTDATE': 'Mar 22, 2024', 'CIRCULARNUMBER': 'C00942-2024'}
Read record: 
	{'ID': 1969, 'TICKER': 'LTG', 'COMPANYNAME': 'LT Group, Inc.', 'TYPEOFSECURITY': 'COMMON', 'TYPEOFDIVIDEND': 'Cash', 'DIVIDENDRATE': 'Php 0.15', 'EXDIVIDENDDATE': 'Mar 08, 2024', 'RECORDDATE': 'Mar 11, 2024', 'PAYMENTDATE': 'Mar 22, 2024', 'CIRCULARNUMBER': 'C00942-2024'}


In [1]:
!pip install pymongo

Collecting pymongo
  Downloading pymongo-4.11.3-cp312-cp312-win_amd64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.11.3-cp312-cp312-win_amd64.whl (882 kB)
   ---------------------------------------- 0.0/882.3 kB ? eta -:--:--
   --------------------------------------- 882.3/882.3 kB 20.0 MB/s eta 0:00:00
Downloading dnspython-2.7.0-py3-none-any.whl (313 kB)
Installing collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.11.3


In [3]:
from pymongo import MongoClient

# Replace with your actual connection string from Azure Cosmos DB
CONNECTION_STRING = "mongodb://fpcsomosmongo:c3pEbDmDcp912VC3Ye0wMKWYgCMoi97O4Lc1iyDZFmZJAISwPwZ1nSVJjyG8MnKYzu9Dy096IfKSACDbvpovsg==@fpcsomosmongo.mongo.cosmos.azure.com:10255/?ssl=true&retrywrites=false&replicaSet=globaldb&maxIdleTimeMS=120000&appName=@fpcsomosmongo@"

# Create a MongoDB client
client = MongoClient(CONNECTION_STRING)

# Select a database
db = client["cicmnldb"]  # Change "my_database" to your actual database name

# Check connection
print("Connected to CosmosDB MongoDB successfully!")

client.close()

Connected to CosmosDB MongoDB successfully!


  client = MongoClient(CONNECTION_STRING)


In [11]:
import requests
import pandas as pd

def fetchstockdata(ticker):

    url = f"https://pseops.azurewebsites.net/api/GetDividendInformation?code=1h/6bs7u4tzxVbSWEpCmqjDMda8tgcD7Pt7tjiT6WOX/YjNMpIbBsQ==&ticker={ticker}"
    
    # Send GET request to the API
    response = requests.get(url)
    
    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        # Parse the JSON response
        data = response.json()
    
        df = pd.DataFrame(data)
    else:
        print(f"Failed to retrieve data. Status code: {response.status_code}")
        return pd.Dataframe({})
    
    return df

In [25]:
dividends = fetchstockdata("LTG")

In [27]:

CONNECTION_STRING = "mongodb://fpcsomosmongo:c3pEbDmDcp912VC3Ye0wMKWYgCMoi97O4Lc1iyDZFmZJAISwPwZ1nSVJjyG8MnKYzu9Dy096IfKSACDbvpovsg==@fpcsomosmongo.mongo.cosmos.azure.com:10255/?ssl=true&retrywrites=false&replicaSet=globaldb&maxIdleTimeMS=120000&appName=@fpcsomosmongo@"
client = MongoClient(CONNECTION_STRING)

db = client["cicmnldb"]  # Change to your database name
collection = db["dividends"]  # Change to your collection name

data_dict = dividends.to_dict(orient="records")

if data_dict:  # Check if DataFrame is not empty
    collection.insert_many(data_dict)
    print(f"Inserted {len(data_dict)} documents into MongoDB!")
else:
    print("No data to insert.")

  client = MongoClient(CONNECTION_STRING)


Inserted 19 documents into MongoDB!


In [19]:
for doc in collection.find():
    print(doc)

{'_id': ObjectId('67e4f350ff3e26e3811729f8'), 'ID': 3586, 'Ticker': 'DMC', 'CompanyName': 'DMCI Holdings, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'P0.35', 'ExDividendDate': 'Apr 08, 2025', 'RecordDate': 'Apr 10, 2025', 'PaymentDate': 'Apr 24, 2025', 'CircularNumber': 'C01886-2025'}
{'_id': ObjectId('67e4f350ff3e26e3811729f9'), 'ID': 3587, 'Ticker': 'DMC', 'CompanyName': 'DMCI Holdings, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'P0.25', 'ExDividendDate': 'Apr 08, 2025', 'RecordDate': 'Apr 10, 2025', 'PaymentDate': 'Apr 24, 2025', 'CircularNumber': 'C01887-2025'}
{'_id': ObjectId('67e4f350ff3e26e3811729fa'), 'ID': 2220, 'Ticker': 'DMC', 'CompanyName': 'DMCI Holdings, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'P0.46', 'ExDividendDate': 'Apr 19, 2024', 'RecordDate': 'Apr 22, 2024', 'PaymentDate': 'May 3, 2024', 'CircularNumber': 'C01920-2024'}
{'_id': ObjectId('67e4f350ff3e26e3811729f

In [23]:
for doc in collection.find({}, {"ID": 0}):
    print(doc)

{'_id': ObjectId('67e4f350ff3e26e3811729f8'), 'Ticker': 'DMC', 'CompanyName': 'DMCI Holdings, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'P0.35', 'ExDividendDate': 'Apr 08, 2025', 'RecordDate': 'Apr 10, 2025', 'PaymentDate': 'Apr 24, 2025', 'CircularNumber': 'C01886-2025'}
{'_id': ObjectId('67e4f350ff3e26e3811729f9'), 'Ticker': 'DMC', 'CompanyName': 'DMCI Holdings, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'P0.25', 'ExDividendDate': 'Apr 08, 2025', 'RecordDate': 'Apr 10, 2025', 'PaymentDate': 'Apr 24, 2025', 'CircularNumber': 'C01887-2025'}
{'_id': ObjectId('67e4f350ff3e26e3811729fa'), 'Ticker': 'DMC', 'CompanyName': 'DMCI Holdings, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'P0.46', 'ExDividendDate': 'Apr 19, 2024', 'RecordDate': 'Apr 22, 2024', 'PaymentDate': 'May 3, 2024', 'CircularNumber': 'C01920-2024'}
{'_id': ObjectId('67e4f350ff3e26e3811729fb'), 'Ticker': 'DMC', 'CompanyName':

In [31]:
for doc in collection.find({"Ticker": "LTG"}, {"ID": 0, "CircularNumber": 0}):
    print(doc)

{'_id': ObjectId('67e4f79bff3e26e381172a05'), 'Ticker': 'LTG', 'CompanyName': 'LT Group, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'Php 0.15', 'ExDividendDate': 'Mar 08, 2024', 'RecordDate': 'Mar 11, 2024', 'PaymentDate': 'Mar 22, 2024'}
{'_id': ObjectId('67e4f79bff3e26e381172a06'), 'Ticker': 'LTG', 'CompanyName': 'LT Group, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'Php 0.15', 'ExDividendDate': 'Mar 08, 2024', 'RecordDate': 'Mar 11, 2024', 'PaymentDate': 'Mar 22, 2024'}
{'_id': ObjectId('67e4f79bff3e26e381172a07'), 'Ticker': 'LTG', 'CompanyName': 'LT Group, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'Php 0.15', 'ExDividendDate': 'Mar 01, 2023', 'RecordDate': 'Mar 6, 2023', 'PaymentDate': 'Mar 17, 2023'}
{'_id': ObjectId('67e4f79bff3e26e381172a08'), 'Ticker': 'LTG', 'CompanyName': 'LT Group, Inc.', 'TypeofSecurity': 'COMMON', 'TypeofDividend': 'Cash', 'DividendRate': 'Php 0.15', 'ExD

### Notes:
- **Use Relational Databases** for structured, transactional workloads.
- **Use NoSQL Databases** for fast, scalable, and schema-flexible applications.
- **Azure SQL DB & OCI ADW** are great for relational workloads.
- **Azure CosmosDB & OCI NoSQL** are useful for NoSQL applications.

In [15]:
!pip install azure-storage-file-datalake azure-identity

Collecting azure-storage-file-datalake
  Downloading azure_storage_file_datalake-12.20.0-py3-none-any.whl.metadata (16 kB)
Collecting azure-identity
  Downloading azure_identity-1.21.0-py3-none-any.whl.metadata (81 kB)
Collecting msal>=1.30.0 (from azure-identity)
  Downloading msal-1.32.0-py3-none-any.whl.metadata (11 kB)
Collecting msal-extensions>=1.2.0 (from azure-identity)
  Downloading msal_extensions-1.3.1-py3-none-any.whl.metadata (7.8 kB)
Downloading azure_storage_file_datalake-12.20.0-py3-none-any.whl (263 kB)
Downloading azure_identity-1.21.0-py3-none-any.whl (189 kB)
Downloading msal-1.32.0-py3-none-any.whl (114 kB)
Downloading msal_extensions-1.3.1-py3-none-any.whl (20 kB)
Installing collected packages: msal, azure-storage-file-datalake, msal-extensions, azure-identity
Successfully installed azure-identity-1.21.0 azure-storage-file-datalake-12.20.0 msal-1.32.0 msal-extensions-1.3.1


In [17]:
import os
from azure.storage.filedatalake import (
    DataLakeServiceClient,
    DataLakeDirectoryClient,
    FileSystemClient
)
from azure.identity import DefaultAzureCredential

In [19]:
def get_service_client_sas(self, account_name: str, sas_token: str) -> DataLakeServiceClient:
    account_url = f"https://{account_name}.dfs.core.windows.net"

    # The SAS token string can be passed in as credential param or appended to the account URL
    service_client = DataLakeServiceClient(account_url, credential=sas_token)

    return service_client

In [21]:
def create_file_system(self, service_client: DataLakeServiceClient, file_system_name: str) -> FileSystemClient:
    file_system_client = service_client.create_file_system(file_system=file_system_name)

    return file_system_client

In [23]:
def create_directory(self, file_system_client: FileSystemClient, directory_name: str) -> DataLakeDirectoryClient:
    directory_client = file_system_client.create_directory(directory_name)

    return directory_client

In [25]:
def upload_file_to_directory(self, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    file_client = directory_client.get_file_client(file_name)

    with open(file=os.path.join(local_path, file_name), mode="rb") as data:
        file_client.upload_data(data, overwrite=True)

In [29]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from azure.storage.filedatalake import DataLakeServiceClient, DataLakeDirectoryClient, FileSystemClient

# Azure Storage details
ACCOUNT_NAME = "ai102form1908822429"  # Your storage account name
CONTAINER_NAME = "dataengineering"  # Your container name
DIRECTORY_NAME = "parquet_data"  # Your directory name
SAS_TOKEN = "?sp=rw&st=2025-04-01T07:17:01Z&se=2025-12-31T15:17:01Z&spr=https&sv=2024-11-04&sr=c&sig=G2%2Bf5j0EZDbnqLUA%2BeFN6pEHu1zIOVsx8I3yFyEhlOo%3D"

# Generate a Parquet file with 10,000 rows
data = {"id": range(1, 10001), "name": [f"User_{i}" for i in range(1, 10001)]}
df = pd.DataFrame(data)

# Convert DataFrame to Parquet format
parquet_path = "sample_data.parquet"
df.to_parquet(parquet_path, engine="pyarrow")

def get_service_client_sas(account_name: str, sas_token: str) -> DataLakeServiceClient:
    """Authenticate to ADLS Gen2 using a SAS token."""
    account_url = f"https://{account_name}.dfs.core.windows.net"
    service_client = DataLakeServiceClient(account_url, credential=sas_token)
    return service_client

def create_file_system(service_client: DataLakeServiceClient, file_system_name: str) -> FileSystemClient:
    """Create or connect to an ADLS container (file system)."""
    file_system_client = service_client.get_file_system_client(file_system=file_system_name)
    return file_system_client

def create_directory(file_system_client: FileSystemClient, directory_name: str) -> DataLakeDirectoryClient:
    """Create or connect to a directory inside a container."""
    directory_client = file_system_client.get_directory_client(directory_name)
    directory_client.create_directory()
    return directory_client

def upload_file_to_directory(directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    """Upload a file to a specified directory in ADLS Gen2."""
    file_client = directory_client.get_file_client(file_name)
    
    # Read file as bytes
    with open(os.path.join(local_path, file_name), "rb") as data:
        file_client.upload_data(data, overwrite=True)

# Authenticate using SAS token
service_client = get_service_client_sas(ACCOUNT_NAME, SAS_TOKEN)

# Connect to the container (file system)
file_system_client = create_file_system(service_client, CONTAINER_NAME)

# Create or connect to the directory
directory_client = create_directory(file_system_client, DIRECTORY_NAME)

# Upload the Parquet file to ADLS Gen2
upload_file_to_directory(directory_client, ".", "sample_data.parquet")

print(f"Parquet file successfully uploaded to ADLS Gen2 in '{DIRECTORY_NAME}/sample_data.parquet'")


✅ Parquet file successfully uploaded to ADLS Gen2 in 'parquet_data/sample_data.parquet'


In [35]:
import pandas as pd
import pyarrow.parquet as pq
import os
from concurrent.futures import ThreadPoolExecutor
from azure.storage.filedatalake import DataLakeServiceClient

# Generate sample data (10,000 rows)
num_rows = 10_000
df = pd.DataFrame({
    "id": range(1, num_rows + 1),
    "name": [f"User_{i}" for i in range(1, num_rows + 1)],
    "amount": [i * 10.5 for i in range(1, num_rows + 1)]
})

# Split the dataframe into 4 chunks
dfs = [df.iloc[i::4] for i in range(4)]

# Function to write and upload each partition
def write_and_upload_parquet(df_chunk, file_name):
    local_file_path = f"./{file_name}"  # Save locally first
    df_chunk.to_parquet(local_file_path, engine="pyarrow")  # Save as Parquet

    # Upload to ADLS
    file_client = directory_client.get_file_client(file_name)
    with open(local_file_path, "rb") as f:
        file_client.upload_data(f, overwrite=True)
    
    os.remove(local_file_path)  # Cleanup local file

# Upload in parallel
file_names = [f"data_part_{i}.parquet" for i in range(4)]
with ThreadPoolExecutor() as executor:
    executor.map(write_and_upload_parquet, dfs, file_names)

print("4 Parquet files successfully written and uploaded!")

4 Parquet files successfully written and uploaded!


In [17]:
!pip install polars dask

Collecting polars
  Downloading polars-1.26.0-cp39-abi3-win_amd64.whl.metadata (15 kB)
Downloading polars-1.26.0-cp39-abi3-win_amd64.whl (35.6 MB)
   ---------------------------------------- 0.0/35.6 MB ? eta -:--:--
   ----- ---------------------------------- 5.0/35.6 MB 25.1 MB/s eta 0:00:02
   ----------- ---------------------------- 10.5/35.6 MB 26.1 MB/s eta 0:00:01
   ------------------- -------------------- 17.3/35.6 MB 28.7 MB/s eta 0:00:01
   ------------------- -------------------- 17.3/35.6 MB 28.7 MB/s eta 0:00:01
   ----------------------- ---------------- 21.0/35.6 MB 20.4 MB/s eta 0:00:01
   ------------------------------ --------- 27.3/35.6 MB 23.0 MB/s eta 0:00:01
   ------------------------------------- -- 33.6/35.6 MB 23.4 MB/s eta 0:00:01
   ---------------------------------------- 35.6/35.6 MB 22.6 MB/s eta 0:00:00
Installing collected packages: polars
Successfully installed polars-1.26.0




In [21]:
import polars as pl
import fsspec

# ADLS Gen2 details
account_name = "ai102form1908822429"
container_name = "dataengineering"
directory_path = "parquet_data/"
account_key = "Dy1W8tWNLutV1g26+zcEXFoPoxEgZVetQRXJ82vBVx6tG/mFL8C0vKp+NarMasUQZCIJ8yGF7GF++AStCReAUQ=="

# Construct the ADLS path with the storage account key
parquet_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{directory_path}"

# Storage options for authentication using account key
storage_options = {
    "account_name": account_name,
    "account_key": account_key
}

# Read Parquet using Polars with authentication
df = pl.read_parquet(parquet_path, storage_options=storage_options)
print(df)


shape: (10_000, 3)
┌───────┬────────────┬──────────┐
│ id    ┆ name       ┆ amount   │
│ ---   ┆ ---        ┆ ---      │
│ i64   ┆ str        ┆ f64      │
╞═══════╪════════════╪══════════╡
│ 1     ┆ User_1     ┆ 10.5     │
│ 5     ┆ User_5     ┆ 52.5     │
│ 9     ┆ User_9     ┆ 94.5     │
│ 13    ┆ User_13    ┆ 136.5    │
│ 17    ┆ User_17    ┆ 178.5    │
│ …     ┆ …          ┆ …        │
│ 9984  ┆ User_9984  ┆ 104832.0 │
│ 9988  ┆ User_9988  ┆ 104874.0 │
│ 9992  ┆ User_9992  ┆ 104916.0 │
│ 9996  ┆ User_9996  ┆ 104958.0 │
│ 10000 ┆ User_10000 ┆ 105000.0 │
└───────┴────────────┴──────────┘


In [25]:
!pip install adlfs pyarrow

Collecting adlfs
  Downloading adlfs-2024.12.0-py3-none-any.whl.metadata (7.7 kB)
Downloading adlfs-2024.12.0-py3-none-any.whl (41 kB)
Installing collected packages: adlfs
Successfully installed adlfs-2024.12.0




In [35]:
import dask.dataframe as dd
import adlfs
import fsspec

# Azure Storage details
account_name = "ai102form1908822429"
container_name = "dataengineering"
directory_path = "parquet_data"  # Folder where parquet files are stored

# Set up the file system to access ADLS Gen2 using SAS token
fs = adlfs.AzureBlobFileSystem(account_name=account_name, account_key=account_key)

# Define the path to the Parquet files
parquet_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{directory_path}/*.parquet"

# Dask reads Parquet files from ADLS Gen2 using the fsspec-compatible file system
df = dd.read_parquet(parquet_path, engine='pyarrow', filesystem=fs)

# Perform Dask operations on the DataFrame (example: compute the first few rows)
print(df)


Dask DataFrame Structure:
                  id    name   amount
npartitions=4                        
               int64  string  float64
                 ...     ...      ...
                 ...     ...      ...
                 ...     ...      ...
                 ...     ...      ...
Dask Name: read_parquet, 1 expression
Expr=ReadParquetFSSpec(4c09191)


# **Phase 2: Data Ingestion & Processing**
<hr></hr>

## **Data Manipulation with Pandas, Polars, and Dask**

### **Pandas**

Pandas is one of the most widely used libraries for data manipulation in Python. It provides powerful tools for working with structured data, such as **DataFrames** and **Series**.

Ideal for small to medium datasets that can fit into memory. **Pandas** is great for data exploration, analysis, and preprocessing before applying machine learning models.

In [77]:
!pip install pandas





In [101]:
import pandas as pd
import numpy as np

# Generate 10,000 random names
names = np.random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Hank', 'Ivy', 'Jack'], 10000)

# Generate random ages between 18 and 80
ages = np.random.randint(18, 81, size=10000)

# Create DataFrame
df = pd.DataFrame({'Name': names, 'Age': ages})

# Display first 5 rows
print(df.head())

# Filtering data: Get people older than 25
adults = df[df['Age'] > 25]
print(adults.head())

# Adding a new column: Mark people older than 30 as seniors
df['Senior'] = df['Age'] > 30
print(df.head())


      Name  Age
0  Charlie   77
1      Eve   23
2    Grace   63
3    Alice   37
4    David   42
      Name  Age
0  Charlie   77
2    Grace   63
3    Alice   37
4    David   42
5    Frank   57
      Name  Age  Senior
0  Charlie   77    True
1      Eve   23   False
2    Grace   63    True
3    Alice   37    True
4    David   42    True


In [103]:
#Handling Missing Data

df['Age'].fillna(df['Age'].mean(), inplace=True)
df2 = df.dropna(subset=['Age'])
df2

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['Age'].fillna(df['Age'].mean(), inplace=True)


Unnamed: 0,Name,Age,Senior
0,Charlie,77,True
1,Eve,23,False
2,Grace,63,True
3,Alice,37,True
4,David,42,True
...,...,...,...
9995,Hank,26,False
9996,David,77,True
9997,Bob,46,True
9998,Ivy,76,True


In [105]:
#Merging and Joining
df1 = pd.DataFrame({'ID': [1, 2, 3], 'Name': ['Alice', 'Bob', 'Charlie']})
df2 = pd.DataFrame({'ID': [1, 2, 4], 'Age': [24, 30, 35]})
merged_df = pd.merge(df1, df2, on='ID', how='left')
print(merged_df)


   ID     Name   Age
0   1    Alice  24.0
1   2      Bob  30.0
2   3  Charlie   NaN


In [107]:
#GroupBy Operations

data = {'Category': ['A', 'A', 'B', 'B'], 'Value': [10, 20, 30, 40]}
df_data = pd.DataFrame(data)
grouped = df_data.groupby('Category')['Value'].sum()
print(grouped)


Category
A    30
B    70
Name: Value, dtype: int64


In [109]:
#Advanced Operations with apply()

df['Age in 5 years'] = df['Age'].apply(lambda x: x + 5)
print(df)


         Name  Age  Senior  Age in 5 years
0     Charlie   77    True              82
1         Eve   23   False              28
2       Grace   63    True              68
3       Alice   37    True              42
4       David   42    True              47
...       ...  ...     ...             ...
9995     Hank   26   False              31
9996    David   77    True              82
9997      Bob   46    True              51
9998      Ivy   76    True              81
9999     Jack   74    True              79

[10000 rows x 4 columns]


### **Polars**

**Polars** is a fast DataFrame library designed to work with large datasets, providing similar functionality to **Pandas** but with better performance, especially for larger datasets.


**Polars** excels at handling large datasets that don't fit into memory, especially when you need to perform high-performance aggregations, groupings, and filtering. It's suitable for data preprocessing at scale, particularly for tasks like ETL processing.



In [115]:
!pip install polars





In [119]:
#Basic DataFrame Creation & Operations
import polars as pl

# Generate 10,000 random names
names = np.random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Hank', 'Ivy', 'Jack'], 10000)

# Generate random ages between 18 and 80
ages = np.random.randint(18, 81, size=10000)


# Create a DataFrame
df = pl.DataFrame({
    'Name': names,
    'Age': ages
})

# Display the DataFrame
print(df)

# Filtering data
adults = df.filter(pl.col('Age') > 25)
print(adults)

# Adding a new column
df = df.with_columns((pl.col('Age') > 30).alias('Senior'))
print(df)


shape: (10_000, 2)
┌───────┬─────┐
│ Name  ┆ Age │
│ ---   ┆ --- │
│ str   ┆ i32 │
╞═══════╪═════╡
│ Hank  ┆ 50  │
│ Frank ┆ 34  │
│ Eve   ┆ 49  │
│ Eve   ┆ 60  │
│ Grace ┆ 29  │
│ …     ┆ …   │
│ Frank ┆ 67  │
│ Alice ┆ 74  │
│ Eve   ┆ 70  │
│ Eve   ┆ 26  │
│ David ┆ 19  │
└───────┴─────┘
shape: (8_733, 2)
┌───────┬─────┐
│ Name  ┆ Age │
│ ---   ┆ --- │
│ str   ┆ i32 │
╞═══════╪═════╡
│ Hank  ┆ 50  │
│ Frank ┆ 34  │
│ Eve   ┆ 49  │
│ Eve   ┆ 60  │
│ Grace ┆ 29  │
│ …     ┆ …   │
│ David ┆ 26  │
│ Frank ┆ 67  │
│ Alice ┆ 74  │
│ Eve   ┆ 70  │
│ Eve   ┆ 26  │
└───────┴─────┘
shape: (10_000, 3)
┌───────┬─────┬────────┐
│ Name  ┆ Age ┆ Senior │
│ ---   ┆ --- ┆ ---    │
│ str   ┆ i32 ┆ bool   │
╞═══════╪═════╪════════╡
│ Hank  ┆ 50  ┆ true   │
│ Frank ┆ 34  ┆ true   │
│ Eve   ┆ 49  ┆ true   │
│ Eve   ┆ 60  ┆ true   │
│ Grace ┆ 29  ┆ false  │
│ …     ┆ …   ┆ …      │
│ Frank ┆ 67  ┆ true   │
│ Alice ┆ 74  ┆ true   │
│ Eve   ┆ 70  ┆ true   │
│ Eve   ┆ 26  ┆ false  │
│ David ┆ 19  ┆ false  │


Polars supports lazy execution, which means that operations like filtering, selecting, and aggregating are not executed immediately. Instead, Polars builds a query plan, optimizes it, and executes it only when .collect() is called.

### **Benefits of Lazy Execution**

**Query Optimization**

Polars reorders operations for efficiency.

**Example:** If you filter first and then join, **Polars** may push the filter down to avoid loading unnecessary data.

**Avoids Intermediate Computation**

If you perform multiple transformations, Polars doesn't compute them step-by-step like Pandas.

Instead, it builds an optimized execution plan and executes everything at once.

**Efficient Memory Usage**

Since no operations are performed until `.collect()`, Polars can avoid loading unnecessary data into memory.

In [121]:
# Lazy Execution

df_lazy = df.lazy()
result = df_lazy.filter(pl.col('Age') > 25).select(['Name']).collect() # Triggers execution: Polars optimizes the query, applies all transformations, and returns the result as a normal Polars DataFrame.
print(result)


shape: (8_733, 1)
┌───────┐
│ Name  │
│ ---   │
│ str   │
╞═══════╡
│ Hank  │
│ Frank │
│ Eve   │
│ Eve   │
│ Grace │
│ …     │
│ David │
│ Frank │
│ Alice │
│ Eve   │
│ Eve   │
└───────┘


In [128]:
import polars as pl

# Sample DataFrame
df = pl.DataFrame({"Name": ["Alice", "Bob", "Charlie"], "Age": [24, 30, 35]})

# Eager execution (executes immediately)
df_filtered = df.filter(pl.col("Age") > 25)
print(df_filtered)


shape: (2, 2)
┌─────────┬─────┐
│ Name    ┆ Age │
│ ---     ┆ --- │
│ str     ┆ i64 │
╞═════════╪═════╡
│ Bob     ┆ 30  │
│ Charlie ┆ 35  │
└─────────┴─────┘


In [130]:
df_lazy = df.lazy().filter(pl.col("Age") > 25).select(["Name"])

print(df_lazy.explain())


simple π 1/2 ["Name"]
  FILTER [(col("Age")) > (25)]
  FROM
    DF ["Name", "Age"]; PROJECT["Name", "Age"] 2/2 COLUMNS


In [132]:
df_lazy = (
    df.lazy()
    .filter(pl.col("Age") > 30)  # Filter first
    .with_columns((pl.col("Age") * 2).alias("Age_Double"))  # Then create new column
)

print(df_lazy.explain())


 WITH_COLUMNS:
 [[(col("Age")) * (2)].alias("Age_Double")] 
  FILTER [(col("Age")) > (30)]
  FROM
    DF ["Name", "Age"]; PROJECT */2 COLUMNS


**Use Lazy Execution When**
- Processing large datasets (millions of rows) 🚀
- Working with cloud storage files (Parquet, CSV, etc.) ☁️
- Chaining multiple transformations (filters, joins, aggregations, etc.)

**Use Eager Execution When**
- Small datasets (less than 1M rows)
- Quick exploratory data analysis (EDA)
- You need an immediate result (df.head(), df.describe())

In [140]:
#GroupBy and Aggregations
import polars as pl

data = {'Category': ['A', 'A', 'B', 'B'], 'Value': [10, 20, 30, 40]}
df = pl.DataFrame(data)

result = df.group_by('Category').agg(pl.col('Value').sum())

result.head()

Category,Value
str,i64
"""B""",70
"""A""",30


In [142]:
#Joins and Merging

df1 = pl.DataFrame({'ID': [1, 2, 3], 'Name': ['Alice', 'Bob', 'Charlie']})
df2 = pl.DataFrame({'ID': [1, 2, 4], 'Age': [24, 30, 35]})
joined_df = df1.join(df2, on='ID', how='left')
print(joined_df)


shape: (3, 3)
┌─────┬─────────┬──────┐
│ ID  ┆ Name    ┆ Age  │
│ --- ┆ ---     ┆ ---  │
│ i64 ┆ str     ┆ i64  │
╞═════╪═════════╪══════╡
│ 1   ┆ Alice   ┆ 24   │
│ 2   ┆ Bob     ┆ 30   │
│ 3   ┆ Charlie ┆ null │
└─────┴─────────┴──────┘


### **Dask**

Dask is a parallel computing framework built for handling larger-than-memory datasets, particularly for dataframes that are too large for Pandas. It is designed to scale from a laptop to a cluster of machines.


Dask is suitable for handling large datasets that don’t fit into memory, and it is particularly useful when you want to scale your data processing tasks to a cluster. It integrates seamlessly with tools like Kubernetes for distributed computing. Dask is great for distributed data processing and parallelizing Pandas operations across many CPUs

In [147]:
!pip install dask





In [149]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# Define the number of rows
num_rows = 10_000_000  # 10 million rows

# Generate random dates within the last 2 years
start_date = datetime.today() - timedelta(days=730)
dates = [start_date + timedelta(days=random.randint(0, 730)) for _ in range(num_rows)]

# Generate random expense categories
categories = ['Food', 'Transport', 'Entertainment', 'Utilities', 'Healthcare', 'Shopping', 'Rent', 'Miscellaneous']
expense_categories = [random.choice(categories) for _ in range(num_rows)]

# Generate random expense amounts
amounts = np.random.uniform(5, 500, num_rows).round(2)

# Generate random payment methods
payment_methods = ['Cash', 'Credit Card', 'Debit Card', 'Online Payment']
payments = [random.choice(payment_methods) for _ in range(num_rows)]

# Create a DataFrame
df = pd.DataFrame({
    'Date': dates,
    'Category': expense_categories,
    'Amount': amounts,
    'PaymentMethod': payments
})

# Save to CSV
df.to_csv('large_expenses.csv', index=False)

print("CSV file with 10 million rows generated successfully!")


CSV file with 10 million rows generated successfully!


In [151]:
import dask.dataframe as dd

# Create a Dask DataFrame from a CSV file (this is the typical way to use Dask)
df = dd.read_csv('large_expenses.csv')

# Perform operations like Pandas, but Dask handles the computation lazily
filtered_df = df[df['Category'] == "Transport"]

# Compute the result
result = filtered_df.compute()
result.shape


(1250031, 4)

In [160]:
# GroupBy and Aggregations
# Similar to Pandas, but computation is distributed across multiple cores:

group_by_result = df.groupby('Category').Amount.sum().compute()
print(group_by_result)


Category
Entertainment    3.158562e+08
Food             3.155832e+08
Healthcare       3.155109e+08
Miscellaneous    3.153237e+08
Rent             3.156528e+08
Shopping         3.155598e+08
Transport        3.156360e+08
Utilities        3.161124e+08
Name: Amount, dtype: float64


In [162]:
#Optimized Merge
import dask.dataframe as dd

# Read CSV into a Dask DataFrame
df = dd.read_csv("large_expenses.csv")

# Add an ID column based on the index
df = df.assign(ID=df.index)

# Compute the result if needed
df = df.compute()
print(df.head())


merged_df = dd.merge(df, df, on='ID')
print(merged_df.compute())


                         Date       Category  Amount   PaymentMethod  ID
0  2024-12-12 11:40:48.847152  Miscellaneous   74.81      Debit Card   0
1  2024-12-02 11:40:48.847152           Food   75.55  Online Payment   1
2  2024-04-26 11:40:48.847152           Rent   73.79      Debit Card   2
3  2023-07-15 11:40:48.847152      Transport  408.25     Credit Card   3
4  2024-09-11 11:40:48.847152      Utilities  472.66  Online Payment   4
                              Date_x     Category_x  Amount_x PaymentMethod_x  \
0         2024-12-12 11:40:48.847152  Miscellaneous     74.81      Debit Card   
1         2024-12-12 11:40:48.847152  Miscellaneous     74.81      Debit Card   
2         2024-12-12 11:40:48.847152  Miscellaneous     74.81      Debit Card   
3         2024-12-12 11:40:48.847152  Miscellaneous     74.81      Debit Card   
4         2024-12-12 11:40:48.847152  Miscellaneous     74.81      Debit Card   
...                              ...            ...       ...             ..

In [167]:
from dask import delayed
import pandas as pd

@delayed
def process_csv(file):
    return pd.read_csv(file)

# List of files
files = ["large_expenses.csv", "large_expenses2.csv"]

# Create delayed tasks
dfs = [process_csv(file) for file in files]

# Execute everything in parallel
results = [df.compute() for df in dfs]

print(results)


[                               Date       Category  Amount   PaymentMethod
0        2024-12-12 11:40:48.847152  Miscellaneous   74.81      Debit Card
1        2024-12-02 11:40:48.847152           Food   75.55  Online Payment
2        2024-04-26 11:40:48.847152           Rent   73.79      Debit Card
3        2023-07-15 11:40:48.847152      Transport  408.25     Credit Card
4        2024-09-11 11:40:48.847152      Utilities  472.66  Online Payment
...                             ...            ...     ...             ...
9999995  2024-02-11 11:40:48.847152           Rent  344.01     Credit Card
9999996  2024-07-05 11:40:48.847152           Food  284.36     Credit Card
9999997  2023-09-11 11:40:48.847152       Shopping  396.34  Online Payment
9999998  2023-10-20 11:40:48.847152      Transport  456.95            Cash
9999999  2024-10-01 11:40:48.847152  Entertainment  127.26      Debit Card

[10000000 rows x 4 columns],                                Date       Category  Amount   PaymentM

Dask’s `delayed()` is used to defer execution and optimize workflows.

Use `delayed()` for expensive, independent computations, NOT simple objects like strings.

Use `.compute()` to trigger execution after all optimizations are applied.

#### **Comparison of When to Use Each Tool**

<div style="float: left;">

| Tool | Best for | Performance |	Scale | Memory Usage |
|------|---------------|--------|------|--------------|
| **Pandas** | Small to medium-sized datasets | Fast for in-memory operations | Limited to a single machine	| High (fits in RAM) |
| **Polars** | Large datasets, high-performance | Very fast, especially on multi-core systems |	Large datasets, multi-core | Moderate (can handle larger-than-memory datasets) |
| **Dask** | Large, out-of-memory datasets | Scalable, handles parallel processing | Scales to multiple machines | Very low (works on chunks of data) |
</div>