In [1]:
import pandas as pd
import sqlite3
import pyarrow as pa
import pyarrow.parquet as pq
import fastavro

# Step 1: Load CSV into SQLite
df = pd.read_csv('HousingData.csv')
conn = sqlite3.connect('housing.db')
df.to_sql('housing', conn, if_exists='replace', index=False)

# Step 2: Export to CSV
df.to_csv('housing_output.csv', index=False)

# Step 3: Export to Parquet
table = pa.Table.from_pandas(df)
pq.write_table(table, 'housing.parquet')

# Step 4: Export to Avro
records = df.to_dict(orient='records')
schema = {
    "doc": "Housing Data",
    "name": "HousingRecord",
    "namespace": "housing.avro",
    "type": "record",
    "fields": [
        {"name": col, "type": ["null", "double"]} for col in df.columns
    ]
}
with open('housing.avro', 'wb') as out:
    fastavro.writer(out, schema, records)

conn.close()


In [2]:
import schedule
import time

def export_pipeline():
    conn = sqlite3.connect('housing.db')
    df = pd.read_sql("SELECT * FROM housing", conn)
    df.to_csv("scheduled_output.csv", index=False)
    conn.close()
    print("Scheduled export completed.")

# Schedule every 1 minute (example)
schedule.every(1).minutes.do(export_pipeline)

# Simulate 3 cycles
for _ in range(3):
    schedule.run_pending()
    time.sleep(60)


Scheduled export completed.
Scheduled export completed.


In [3]:
source_conn = sqlite3.connect('housing.db')
dest_conn = sqlite3.connect('housing_copy.db')

# Copy all tables dynamically
tables = source_conn.execute("SELECT name FROM sqlite_master WHERE type='table';").fetchall()

for table_name in tables:
    table = table_name[0]
    df = pd.read_sql(f"SELECT * FROM {table}", source_conn)
    df.to_sql(table, dest_conn, if_exists='replace', index=False)

source_conn.close()
dest_conn.close()


In [4]:
source_conn = sqlite3.connect('housing.db')
dest_conn = sqlite3.connect('housing_filtered_copy.db')

# Example: Select a few columns
df = pd.read_sql("SELECT CRIM, ZN, INDUS FROM housing", source_conn)
df.to_sql('housing_filtered', dest_conn, if_exists='replace', index=False)

source_conn.close()
dest_conn.close()


In [5]:
print(df.head())  # Show first 5 rows
print("Rows loaded:", len(df))


      CRIM    ZN  INDUS
0  0.00632  18.0   2.31
1  0.02731   0.0   7.07
2  0.02729   0.0   7.07
3  0.03237   0.0   2.18
4  0.06905   0.0   2.18
Rows loaded: 506


In [6]:
print("CSV export complete: housing_output.csv")
print("Parquet export complete: housing.parquet")
print("Avro export complete: housing.avro")


CSV export complete: housing_output.csv
Parquet export complete: housing.parquet
Avro export complete: housing.avro


In [7]:
import os

print("CSV exists:", os.path.exists("housing_output.csv"))
print("Parquet exists:", os.path.exists("housing.parquet"))
print("Avro exists:", os.path.exists("housing.avro"))


CSV exists: True
Parquet exists: True
Avro exists: True


In [8]:
# Check CSV
print(pd.read_csv("housing_output.csv").head())

# Check Parquet
print(pq.read_table("housing.parquet").to_pandas().head())


      CRIM    ZN  INDUS  CHAS    NOX     RM   AGE     DIS  RAD  TAX  PTRATIO  \
0  0.00632  18.0   2.31   0.0  0.538  6.575  65.2  4.0900    1  296     15.3   
1  0.02731   0.0   7.07   0.0  0.469  6.421  78.9  4.9671    2  242     17.8   
2  0.02729   0.0   7.07   0.0  0.469  7.185  61.1  4.9671    2  242     17.8   
3  0.03237   0.0   2.18   0.0  0.458  6.998  45.8  6.0622    3  222     18.7   
4  0.06905   0.0   2.18   0.0  0.458  7.147  54.2  6.0622    3  222     18.7   

        B  LSTAT  MEDV  
0  396.90   4.98  24.0  
1  396.90   9.14  21.6  
2  392.83   4.03  34.7  
3  394.63   2.94  33.4  
4  396.90    NaN  36.2  
      CRIM    ZN  INDUS  CHAS    NOX     RM   AGE     DIS  RAD  TAX  PTRATIO  \
0  0.00632  18.0   2.31   0.0  0.538  6.575  65.2  4.0900    1  296     15.3   
1  0.02731   0.0   7.07   0.0  0.469  6.421  78.9  4.9671    2  242     17.8   
2  0.02729   0.0   7.07   0.0  0.469  7.185  61.1  4.9671    2  242     17.8   
3  0.03237   0.0   2.18   0.0  0.458  6.998  45.8