## Structured Data

In [None]:
import sqlite3

In [None]:
# connect to in-memory database
conn = sqlite3.connect(':memory:')
cur = conn.cursor()

In [None]:
# create a employee table
cur.execute('''
    CREATE TABLE employee (
        id INTEGER PRIMARY KEY,
        name TEXT,
        department TEXT,
        salary REAL
    )
''')

<sqlite3.Cursor at 0x7e9d49c4f540>

In [None]:
# insert few sample data
cur.execute("INSERT INTO employee VALUES (1, 'John', 'HR', 50000)")
cur.execute("INSERT INTO employee VALUES (2, 'Jane', 'IT', 60000)")
cur.execute("INSERT INTO employee VALUES (3, 'Bob', 'Finance', 55000)")

<sqlite3.Cursor at 0x7e9d49c4f540>

In [None]:
# read the data
cur.execute("SELECT salary FROM employee where department='IT';")
rows = cur.fetchall()
for row in rows:
    print(row)

(60000.0,)


In [None]:
# update the data
cur.execute("UPDATE employee SET salary = 65000 WHERE id = 2;")


<sqlite3.Cursor at 0x7e9d8f55e0c0>

In [None]:
# delete the data
cur.execute("DELETE FROM employee WHERE id = 3;")

<sqlite3.Cursor at 0x7e9d8f55e0c0>

In [None]:
cur.execute("SELECT * FROM employee;")
rows = cur.fetchall()
for row in rows:
    print(row)

(1, 'John', 'HR', 50000.0)
(2, 'Jane', 'IT', 65000.0)


In [None]:
conn.close()

## Semi structured Data

In [None]:
import json
import pandas as pd

In [None]:
# sample data in json format
data = [
    {
        "id": 1,
        "name": "Alice",
        "contact": {
            "email": "alice@example.com",
            "phone": "123-456-7890"
        },
        "skills": ["Python", "SQL"]
    },
    {
        "id": 2,
        "name": "Bob",
        "contact": {
            "email": "bob@example.com",
            "phone": "987-654-3210"
        },
              "skills": ["Java", "AWS"]
    }
]

In [None]:
data[0]['contact']['email']

'alice@example.com'

In [None]:
df = pd.json_normalize(data)

In [None]:
df

Unnamed: 0,id,name,skills,contact.email,contact.phone
0,1,Alice,"[Python, SQL]",alice@example.com,123-456-7890
1,2,Bob,"[Java, AWS]",bob@example.com,987-654-3210


In [None]:
df[df['name']=="Bob"]

Unnamed: 0,id,name,skills,contact.email,contact.phone
1,2,Bob,"[Java, AWS]",bob@example.com,987-654-3210


In [None]:
from lxml import etree

In [None]:
xml = """
<bookstore>
  <book>
    <title>Python Basics</title>
    <price>45</price>
  </book>
  <book>
    <title>XML Essentials</title>
    <price>25</price>
  </book>
</bookstore>
"""

In [None]:
tree = etree.fromstring(xml)

In [None]:
titles = tree.xpath('/bookstore/book/title/text()')
print(titles)

['Python Basics', 'XML Essentials']


In [None]:
titles = tree.xpath('//book[price>30]/title/text()')
print(titles)

['Python Basics']


## Unstructured Data

In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import string

In [None]:
nltk.download("punkt")
nltk.download("punkt_tab")
nltk.download("stopwords")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
stopwords.words('english')

['a',
 'about',
 'above',
 'after',
 'again',
 'against',
 'ain',
 'all',
 'am',
 'an',
 'and',
 'any',
 'are',
 'aren',
 "aren't",
 'as',
 'at',
 'be',
 'because',
 'been',
 'before',
 'being',
 'below',
 'between',
 'both',
 'but',
 'by',
 'can',
 'couldn',
 "couldn't",
 'd',
 'did',
 'didn',
 "didn't",
 'do',
 'does',
 'doesn',
 "doesn't",
 'doing',
 'don',
 "don't",
 'down',
 'during',
 'each',
 'few',
 'for',
 'from',
 'further',
 'had',
 'hadn',
 "hadn't",
 'has',
 'hasn',
 "hasn't",
 'have',
 'haven',
 "haven't",
 'having',
 'he',
 "he'd",
 "he'll",
 'her',
 'here',
 'hers',
 'herself',
 "he's",
 'him',
 'himself',
 'his',
 'how',
 'i',
 "i'd",
 'if',
 "i'll",
 "i'm",
 'in',
 'into',
 'is',
 'isn',
 "isn't",
 'it',
 "it'd",
 "it'll",
 "it's",
 'its',
 'itself',
 "i've",
 'just',
 'll',
 'm',
 'ma',
 'me',
 'mightn',
 "mightn't",
 'more',
 'most',
 'mustn',
 "mustn't",
 'my',
 'myself',
 'needn',
 "needn't",
 'no',
 'nor',
 'not',
 'now',
 'o',
 'of',
 'off',
 'on',
 'once',
 'on

In [None]:
text = """
OpenAI released a new version of GPT, and it’s changing the way we work with text data in AI applications.
"""

In [None]:
# 1.lowercase + remove punctuation
text = text.lower()
text = text.translate(str.maketrans('', '', string.punctuation))

In [None]:
text

'\nopenai released a new version of gpt and it’s changing the way we work with text data in ai applications\n'

In [None]:
# 2. tokenization
tokens = word_tokenize(text)

In [None]:
tokens

['openai',
 'released',
 'a',
 'new',
 'version',
 'of',
 'gpt',
 'and',
 'it',
 '’',
 's',
 'changing',
 'the',
 'way',
 'we',
 'work',
 'with',
 'text',
 'data',
 'in',
 'ai',
 'applications']

In [None]:
# 3 remove stopwords
stop_words = set(stopwords.words('english'))
filtered_tokens = [token for token in tokens if token not in stop_words]

In [None]:
filtered_tokens

['openai',
 'released',
 'new',
 'version',
 'gpt',
 '’',
 'changing',
 'way',
 'work',
 'text',
 'data',
 'ai',
 'applications']

In [None]:
import spacy

In [None]:
nlp = spacy.load("en_core_web_sm")

In [None]:
text = """
OpenAI released a new version of GPT, and it’s changing the way we work with text data in AI applications.
"""

In [None]:
doc = nlp(text)

In [None]:
# tokenization and stopword removal
tokens = [token.text.lower() for token in doc if not token.is_stop and not token.is_punct]

In [None]:
tokens

['\n',
 'openai',
 'released',
 'new',
 'version',
 'gpt',
 'changing',
 'way',
 'work',
 'text',
 'data',
 'ai',
 'applications',
 '\n']

In [None]:
# NER
for ent in doc.ents:
    print(ent.text, ent.label_)

OpenAI PERSON
GPT ORG
AI GPE


In [None]:
import pandas as pd

df = pd.DataFrame({
    "user_id": [1, 2],
    "event": ["view_product", "add_to_cart"],
    "timestamp": ["2024-07-01T12:00:00", "2024-07-01T12:05:00"]
})
df.to_parquet("clickstream.parquet")


In [None]:
import json
import spacy


In [None]:
nlp = spacy.load("en_core_web_sm")

In [None]:
# structured csv
print("[1] Orders CSV")
orders = pd.read_csv("orders.csv")
print("Classificaiton: Structured")
orders.head()

[1] Orders CSV
Classificaiton: Structured


Unnamed: 0,order_id,customer_id,order_date,amount
0,101,2001,2024-05-01,250.75
1,102,2002,2024-05-02,140.0


In [None]:
# 2. Semi-Structured – JSON
print("\n[2] Product Catalog (JSON):")
with open("products.json") as f:
    products = json.load(f)
print(products[0])
print("🔹 Classification: Semi-Structured")
print("🔸 Suggested Tool: MongoDB / Document DB")


[2] Product Catalog (JSON):
{'product_id': 'P001', 'name': 'Wireless Mouse', 'specs': {'color': 'black', 'connectivity': 'Bluetooth'}}
🔹 Classification: Semi-Structured
🔸 Suggested Tool: MongoDB / Document DB


In [None]:
# 3. Unstructured – Reviews Text
print("\n[3] User Reviews (Text):")
with open("reviews.txt") as f:
    reviews = f.read()
doc = nlp(reviews)
entities = [(ent.text, ent.label_) for ent in doc.ents]
print("Named Entities:", entities)
print("🔹 Classification: Unstructured")
print("🔸 Suggested Tool: Elasticsearch / NLP Pipeline")


[3] User Reviews (Text):
Named Entities: []
🔹 Classification: Unstructured
🔸 Suggested Tool: Elasticsearch / NLP Pipeline


In [None]:
# 4. Clickstream – Parquet
print("\n[4] Clickstream Events (Parquet):")
clicks = pd.read_parquet("clickstream.parquet")
print(clicks.head())
print("🔹 Classification: Semi-Structured")
print("🔸 Suggested Tool: S3 + Athena / Spark / BigQuery")


[4] Clickstream Events (Parquet):
   user_id         event            timestamp
0        1  view_product  2024-07-01T12:00:00
1        2   add_to_cart  2024-07-01T12:05:00
🔹 Classification: Semi-Structured
🔸 Suggested Tool: S3 + Athena / Spark / BigQuery


## Data Warehouse

### OLTP

In [1]:
import sqlite3


In [2]:
# connecting to in memory db
conn = sqlite3.connect(':memory:')
cur = conn.cursor()

In [3]:
# create table
cur.executescript(
    """
    CREATE TABLE Customers (
    CustomerID INTEGER PRIMARY KEY AUTOINCREMENT,
    Name TEXT,
    Email TEXT UNIQUE
    );

    CREATE TABLE Products (
        ProductID INTEGER PRIMARY KEY AUTOINCREMENT,
        Name TEXT,
        Price REAL,
        StockQty INTEGER
    );

    CREATE TABLE Orders (
        OrderID INTEGER PRIMARY KEY AUTOINCREMENT,
        CustomerID INTEGER,
        OrderDate TEXT DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (CustomerID) REFERENCES Customers(CustomerID)
    );

    CREATE TABLE OrderItems (
        OrderItemID INTEGER PRIMARY KEY AUTOINCREMENT,
        OrderID INTEGER,
        ProductID INTEGER,
        Quantity INTEGER,
        ItemPrice REAL,
        FOREIGN KEY (OrderID) REFERENCES Orders(OrderID),
        FOREIGN KEY (ProductID) REFERENCES Products(ProductID)
    );
    """
)

<sqlite3.Cursor at 0x78293dc15c40>

In [4]:
conn.commit()

In [5]:
products = [
    ('Laptop', 75000.0, 10),
    ('Headphones', 1500.0, 50),
    ('Keyboard', 2000.0, 30),
]

In [6]:
cur.executemany("INSERT INTO Products (Name, Price, StockQty) VALUES (?, ?, ?)", products)
conn.commit()

In [7]:
def place_order(name, email, product_id, quantity):
    try:
        conn.execute('BEGIN')  # Begin transaction

        # Insert customer if not exists
        cur.execute('INSERT OR IGNORE INTO Customers (Name, Email) VALUES (?, ?)', (name, email))
        cur.execute('SELECT CustomerID FROM Customers WHERE Email = ?', (email,))
        customer_id = cur.fetchone()[0]

        # Check stock
        cur.execute('SELECT StockQty, Price FROM Products WHERE ProductID = ?', (product_id,))
        row = cur.fetchone()
        if not row or row[0] < quantity:
            raise Exception("Insufficient stock")

        stock_qty, price = row
        total_price = price * quantity

        # Insert order
        cur.execute('INSERT INTO Orders (CustomerID) VALUES (?)', (customer_id,))
        order_id = cur.lastrowid

        # Insert order item
        cur.execute('''
            INSERT INTO OrderItems (OrderID, ProductID, Quantity, ItemPrice)
            VALUES (?, ?, ?, ?)
        ''', (order_id, product_id, quantity, total_price))

        # Update stock
        cur.execute('UPDATE Products SET StockQty = StockQty - ? WHERE ProductID = ?', (quantity, product_id))

        conn.commit()
        print(f"✅ Order placed successfully for OrderID: {order_id}")

    except Exception as e:
        conn.rollback()
        print(f"❌ Order failed: {str(e)}")


In [8]:
place_order("John Paul", "johnpaul@gmail.com",product_id=1, quantity=2)

✅ Order placed successfully for OrderID: 1


In [9]:
print("Remaining Stock:")
cur.execute('SELECT Name, StockQty FROM Products')
for row in cur.fetchall():
    print(f"{row[0]}: {row[1]}")

Remaining Stock:
Laptop: 8
Headphones: 50
Keyboard: 30


In [10]:
print("Orders:")
cur.execute('''SELECT Orders.OrderID, Customers.Name, Products.Name, OrderItems.Quantity, OrderItems.ItemPrice
FROM Orders
JOIN Customers ON Orders.CustomerID = Customers.CustomerID
JOIN OrderItems ON Orders.OrderID = OrderItems.OrderID
JOIN Products ON OrderItems.ProductID = Products.ProductID''')
for row in cur.fetchall():
    print(f"OrderID: {row[0]}, Customer: {row[1]}, Product: {row[2]}, Quantity: {row[3]}, ItemPrice: {row[4]}")

Orders:
OrderID: 1, Customer: John Paul, Product: Laptop, Quantity: 2, ItemPrice: 150000.0


In [11]:
place_order("John Paul", "johnpaul@gmail.com",product_id=1, quantity=20)

❌ Order failed: Insufficient stock


### Star Schema

In [13]:
# Create Dimensions
cur.executescript("""
CREATE TABLE Dim_Date (
    Date_Key INTEGER PRIMARY KEY,
    Full_Date TEXT,
    Year INTEGER,
    Month INTEGER,
    Day INTEGER
);

CREATE TABLE Dim_Store (
    Store_Key INTEGER PRIMARY KEY,
    Store_Name TEXT,
    City TEXT,
    Region TEXT
);

CREATE TABLE Dim_Product (
    Product_Key INTEGER PRIMARY KEY,
    Product_Name TEXT,
    Category TEXT,
    Brand TEXT
);

-- Create Fact Table
CREATE TABLE Fact_Sales (
    Sales_ID INTEGER PRIMARY KEY AUTOINCREMENT,
    Date_Key INTEGER,
    Store_Key INTEGER,
    Product_Key INTEGER,
    Units_Sold INTEGER,
    Revenue REAL,
    FOREIGN KEY(Date_Key) REFERENCES Dim_Date(Date_Key),
    FOREIGN KEY(Store_Key) REFERENCES Dim_Store(Store_Key),
    FOREIGN KEY(Product_Key) REFERENCES Dim_Product(Product_Key)
);
""")

conn.commit()
print("✅ Star schema tables created.")

✅ Star schema tables created.


In [14]:
cur.executemany("INSERT INTO Dim_Date VALUES (?, ?, ?, ?, ?)", [
    (20240701, "2024-07-01", 2024, 7, 1),
    (20240702, "2024-07-02", 2024, 7, 2),
])

# Store Dimension
cur.executemany("INSERT INTO Dim_Store VALUES (?, ?, ?, ?)", [
    (1, "Store A", "Mumbai", "West"),
    (2, "Store B", "Delhi", "North"),
])

# Product Dimension
cur.executemany("INSERT INTO Dim_Product VALUES (?, ?, ?, ?)", [
    (100, "Laptop", "Electronics", "Lenovo"),
    (101, "Mouse", "Accessories", "Logitech"),
])

conn.commit()
print("✅ Dimension data inserted.")

✅ Dimension data inserted.


In [15]:
# Fact Sales
cur.executemany("INSERT INTO Fact_Sales (Date_Key, Store_Key, Product_Key, Units_Sold, Revenue) VALUES (?, ?, ?, ?, ?)", [
    (20240701, 1, 100, 3, 210000),
    (20240701, 2, 101, 10, 15000),
    (20240702, 1, 101, 5, 7500),
])

conn.commit()
print("✅ Fact table populated.")


✅ Fact table populated.


In [22]:
# Total Revenue by Region and Product Category
query = """
SELECT S.Region, P.Category, SUM(F.Revenue) AS Total_Revenue, SUM(F.Units_Sold) AS Total_Units_Sold
FROM Fact_Sales F
JOIN Dim_Store S ON F.Store_Key = S.Store_Key
JOIN Dim_Product P ON F.Product_Key = P.Product_Key
GROUP BY S.Region, P.Category
ORDER BY Total_Revenue DESC;
"""


In [23]:
import pandas as pd
pd.read_sql_query(query, conn)

Unnamed: 0,Region,Category,Total_Revenue,Total_Units_Sold
0,West,Electronics,210000.0,3
1,North,Accessories,15000.0,10
2,West,Accessories,7500.0,5


### Mini-ETL

In [24]:
import pandas as pd
import sqlite3
import requests
import io

In [25]:
url = "https://people.sc.fsu.edu/~jburkardt/data/csv/airtravel.csv"

In [26]:
csv_raw = requests.get(url).text

In [27]:
csv_raw

'"Month", "1958", "1959", "1960"\n"JAN",  340,  360,  417\n"FEB",  318,  342,  391\n"MAR",  362,  406,  419\n"APR",  348,  396,  461\n"MAY",  363,  420,  472\n"JUN",  435,  472,  535\n"JUL",  491,  548,  622\n"AUG",  505,  559,  606\n"SEP",  404,  463,  508\n"OCT",  359,  407,  461\n"NOV",  310,  362,  390\n"DEC",  337,  405,  432\n\n'

In [28]:
df = pd.read_csv(io.StringIO(csv_raw))

In [29]:
df

Unnamed: 0,Month,"""1958""","""1959""","""1960"""
0,JAN,340,360,417
1,FEB,318,342,391
2,MAR,362,406,419
3,APR,348,396,461
4,MAY,363,420,472
5,JUN,435,472,535
6,JUL,491,548,622
7,AUG,505,559,606
8,SEP,404,463,508
9,OCT,359,407,461


In [30]:
df.columns = [c.lower() for c in df.columns]

In [31]:
df

Unnamed: 0,month,"""1958""","""1959""","""1960"""
0,JAN,340,360,417
1,FEB,318,342,391
2,MAR,362,406,419
3,APR,348,396,461
4,MAY,363,420,472
5,JUN,435,472,535
6,JUL,491,548,622
7,AUG,505,559,606
8,SEP,404,463,508
9,OCT,359,407,461


In [37]:
# renaming cloumns
df.rename(columns={'1958': '58', "1959": "59"}, inplace=True)

In [38]:
df

Unnamed: 0,month,"""1958""","""1959""","""1960"""
0,JAN,340,360,417
1,FEB,318,342,391
2,MAR,362,406,419
3,APR,348,396,461
4,MAY,363,420,472
5,JUN,435,472,535
6,JUL,491,548,622
7,AUG,505,559,606
8,SEP,404,463,508
9,OCT,359,407,461


In [34]:
conn = sqlite3.connect("./mini_etl.db")
cur = conn.cursor()

In [35]:
df.to_sql("airtravel", conn, if_exists="replace", index=False)

12

In [36]:
# read sql table
pd.read_sql_query("SELECT * FROM airtravel", conn)

Unnamed: 0,month,"""1958""","""1959""","""1960"""
0,JAN,340,360,417
1,FEB,318,342,391
2,MAR,362,406,419
3,APR,348,396,461
4,MAY,363,420,472
5,JUN,435,472,535
6,JUL,491,548,622
7,AUG,505,559,606
8,SEP,404,463,508
9,OCT,359,407,461
