In [7]:
from dotenv import load_dotenv
import os
load_dotenv()
storage_account = os.getenv("AZURE_STORAGE_ACCOUNT")
adls_client_id = os.getenv("ADLS_CLIENT_ID")
adls_client_secret = os.getenv("ADLS_CLIENT_SECRET")
adls_tenant_id = os.getenv("ADLS_TENANT_ID")
value = os.getenv('AZURE_DATA_LAKE_CONNECTION_STRING')
container_name = os.getenv('AZURE_CONTAINER_NAME')

In [2]:
pip install -r requirements.txt

Collecting azure-storage-file-datalake (from -r requirements.txt (line 3))
  Downloading azure_storage_file_datalake-12.18.0-py3-none-any.whl.metadata (16 kB)
Downloading azure_storage_file_datalake-12.18.0-py3-none-any.whl (258 kB)
   ---------------------------------------- 0.0/258.4 kB ? eta -:--:--
   ---------------------------------------  256.0/258.4 kB 5.2 MB/s eta 0:00:01
   ---------------------------------------- 258.4/258.4 kB 5.3 MB/s eta 0:00:00
Installing collected packages: azure-storage-file-datalake
Successfully installed azure-storage-file-datalake-12.18.0
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [4]:
from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient

# Test credentials
credential = ClientSecretCredential(
    tenant_id= adls_tenant_id,
    client_id= adls_client_id,
    client_secret= adls_client_secret
)

# Test connection
service_client = BlobServiceClient(
    account_url="https://myvisekendatalake.blob.core.windows.net",
    credential=credential
)

# List containers to verify connectivity
print([container.name for container in service_client.list_containers()])


['iceberg-metadata', 'myviseken-data-lake']


In [12]:
from azure.storage.blob import BlobServiceClient
import pandas as pd

blob_service_client = BlobServiceClient.from_connection_string(value)

# Access the container and blob
container_name = os.getenv('AZURE_CONTAINER_NAME')
blob_name = "raw/carlist/data_20250102170445.csv"
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

# Download the blob content
stream = blob_client.download_blob().readall()

# Load into pandas DataFrame
import io
df = pd.read_csv(io.BytesIO(stream))
print(df.head())

   listing_id                                              title  \
0    15774123  2012 Perodua Myvi 1.5 SE Hatchback (MONTHLY RM...   
1    15964938  2014 Perodua Myvi 1.3 SE Hatchback (MUKA RM289...   
2    15531831                    2018 Perodua Myvi 1.5 Hatchback   
3    15642030                  2019 Perodua Myvi 1.5 H Hatchback   
4    15678245                  2018 Perodua Myvi 1.3 X Hatchback   

    installment  year variant  mileage transmission        location  \
0  RM 214/month  2012      SE    77500    Automatic  Seri Kembangan   
1  RM 249/month  2014      SE    57500    Automatic  Seri Kembangan   
2  RM 555/month  2018      AV    80118    Automatic     Setiawangsa   
3  RM 576/month  2019       H    57500    Automatic   Petaling Jaya   
4  RM 511/month  2018       X    85519    Automatic   Petaling Jaya   

          state                                      listing_image   price  \
0      Selangor  https://img1.icarcdn.com/32147751/main-m_used-...     328   
1      S

In [14]:
#REMOVE STATE WITH NULL VALUE AS WE WILL PROCESS IT STATE BY STATE
# distinct_state = df['state'].unique()
# print(distinct_state)
# df[df['state'].isnull()]
df = df[df['state'].isnull() == False]
# df.count()


In [8]:
from sqlalchemy.engine import create_engine
from datetime import datetime
from sqlalchemy import DateTime, String, Integer, MetaData, Table, Column, text, ForeignKey
from sqlalchemy.types import TEXT


def get_db_connection():
    load_dotenv()
    #LOAD CREDENTIAL FROM .ENV FILE
    server_name = os.getenv("SQL_SERVER")
    database_name = os.getenv("DB_NAME")
    username = os.getenv("DB_USERNAME")
    password = os.getenv("DB_PASSWORD")

    # Connection string using SQL login
    connection_string = (
        f"mssql+pyodbc://{username}:{password}"
        f"@{server_name}.database.windows.net:1433/{database_name}"
        "?driver=ODBC+Driver+18+for+SQL+Server&encrypt=yes&TrustServerCertificate=no&timeout=30"
    )

    # Create the SQLAlchemy engine and connect
    engine = create_engine(connection_string)
    return engine

In [11]:
def create_table():
    engine = get_db_connection()
    
    metadata_obj = MetaData()
    
    test_staging = Table(
        "test_staging",
        metadata_obj,
        Column("listing_id", String(255)),
        Column("title", String(255)),
        Column("installment", String(60)),
        Column("year", String(255)),
        Column("variant", String(50)),
        Column("mileage", String(255)),
        Column("transmission", String(50)),
        Column("color", String(50)),
        Column("location", String(255)),
        Column("state", String(255), nullable=False),
        Column("listing_image", String(255), nullable=False),
        Column("price", String(255), nullable=False),
        Column("url", String(255), nullable=False),
        Column("image", TEXT, nullable=True),
        Column("car_model", String(50), nullable=False),
        Column("processing_date", DateTime, nullable=False)
        )
    
    metadata_obj.create_all(engine)
   

# create_table()
# Clean up

def drop_table():
    engine = get_db_connection()
    metadata_obj = MetaData()
    
    test_staging = Table(
        "test_staging",
        metadata_obj,
        Column("listing_id", String(255)),
        Column("title", String(255)),
        Column("installment", String(60)),
        Column("year", String(255)),
        Column("variant", String(50)),
        Column("mileage", String(255)),
        Column("transmission", String(50)),
        Column("location", String(255)),
        Column("state", String(255), nullable=False),
        Column("listing_image", String(255), nullable=False),
        Column("price", String(255), nullable=False),
        Column("url", String(255), nullable=False),
        Column("image", TEXT, nullable=True),
        Column("car_model", String(50), nullable=False),
        Column("processing_date", DateTime, nullable=False)
        )
    
    # drop table
    test_staging.drop(engine)

# create_table()
# drop_table()


In [16]:
import json
def insert_data(df):
    engine = get_db_connection()

    # Add new columns to DataFrame
    df['processing_date'] = datetime.now()
    df['color'] = None
    df['car_model'] = 'myvi'

    with engine.connect() as connection:
        # Enable identity insert
        df['image'] = df['image'].apply(lambda x: json.dumps(x) if x is not None else None)
        df.to_sql('test_staging', engine, if_exists='append', index=False)
        
        # Disable identity insert
        
        connection.commit()

# insert_data(df)

In [25]:
from sqlalchemy.engine import create_engine
from msal import ConfidentialClientApplication

authority = f"https://login.microsoftonline.com/{adls_tenant_id}"

# Token endpoint
app = ConfidentialClientApplication(adls_client_id, authority=authority, client_credential=adls_client_secret)
token_response = app.acquire_token_for_client(scopes=["https://database.windows.net/.default"])
access_token = token_response["access_token"]

connection_string = (
    f"mssql+pyodbc://{aad_username}:{aad_password}"
    f"@{server_name}.database.windows.net:1433/{database_name}"
    f"?driver=ODBC+Driver+18+for+SQL+Server&authentication=ActiveDirectoryPassword"
)

engine = create_engine(connection_string, connect_args={"token": access_token})
connection = engine.connect()

from sqlalchemy import MetaData
metadata = MetaData()
metadata.reflect(bind=engine)

from sqlalchemy import select
table = metadata.tables['test_table']
query = select([table])
result = connection.execute(query)
for row in result:
    print(row)


OperationalError: (pyodbc.OperationalError) ('HYT00', '[HYT00] [Microsoft][ODBC Driver 18 for SQL Server]Login timeout expired (0) (SQLDriverConnect)')
(Background on this error at: https://sqlalche.me/e/20/e3q8)