## Imports

import requests
import json
import os
import schedule
import time
from datetime import datetime
import pytz
import pyodbc
import pandas as pd
from azure.storage.filedatalake import DataLakeServiceClient
import logging

##Configure logging to console and file

## Logging confg.

In [None]:

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

## Fetching Country Data

In [3]:

countries = ['india', 'us', 'uk', 'china', 'russia']
base_url = 'https://restcountries.com/v3.1/name/'
country_output_dir = 'country_data'

## Database and ADLS info


sql_connection_string = "Driver={SQL Server};Server=<server>;Database=<database>;Trusted_Connection=yes;"
adls_account_name = "<csg10030000968d2b>"
adls_account_key = "<sknAoR6qDF9LwJeaX+dEtxjQZN94fkz9BNMLI7/web3R2PwO2xEtA8txyMH5n49KYDBzZXVC9duTiL3M9Q++gplZ6KPxWZ2eqVDbN7KOw>"
adls_filesystem = "data"
customer_table = "CustomerTable"
product_table = "ProductTable"
customer_output_path = "customer_data"
product_output_path = "product_data"

## Initialize ADLS client

In [5]:

def get_adls_client():
    try:
        account_name = csg10030000968d2b
        account_key = sknAoR6qDF9LwJeaX+dEtxjQZN94fkz9BNMLI7/web3R2PwO2xEtA8txyMH5n49KYDBzZXVC9duTiL3M9Q++gplZ6KPxWZ2eqVDbN7KOw

        client = DataLakeServiceClient(
            account_url=f"https://{account_name1}.dfs.core.windows.net",
            credential=account_key
        )
        logger.info("Successfully initialized ADLS client")
        return client
    except Exception as e:
        logger.error(f"Error init.: {e}")
        return None

 ## Fetch & Save Data

In [6]:

def fetch_country_data(country):
    try:
        response = requests.get(f"{base_url}{country}")
        response.raise_for_status()
        data = response.json()
        os.makedirs(country_output_dir, exist_ok=True)
        filename = os.path.join(country_output_dir, f"{country}.json")
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2)
        logger.info(f"The Data Is Saved Successfully For {country} To {filename}")
    except requests.exceptions.RequestException as e:
        logger.error(f"Error in fetching data for {country}: {e}")
    except json.JSONDecodeError as e:
        logger.error(f"Error in decoding JSON for {country}: {e}")
    except IOError as e:
        logger.error(f"Error in writing file for {country}: {e}")

## Fetch customer count

In [7]:

def get_customer_count():
    try:
        sql_connection_string = DefaultEndpoints Protocol=https:AccountName=csg10030000968d2b
        conn = pyodbc.connect(sql_connection_string)
        query = f"SELECT COUNT(*) AS CustomerCount FROM {customer_table}"
        customer_count = pd.read_sql(query, conn).iloc[0]["CustomerCount"]
        conn.close()
        logger.info(f"Customer count: {customer_count}")
        return customer_count
    except Exception as e:
        logger.error(f"Error in fetching customer count: {e}")
        return 0

## Copy to ADLS

In [8]:
def copy_to_adls(data, filesystem, directory, filename):
    try:
        adls_client = get_adls_client()
        if adls_client is None:
            logger.error("ADLS client Error...")
            return
        file_system_client = adls_client.get_file_system_client(filesystem)
        directory_client = file_system_client.get_directory_client(directory)
        directory_client.create_directory()
        file_client = directory_client.get_file_client(filename)
        file_client.create_file()
        file_client.append_data(data.to_parquet(), 0)
        file_client.flush_data(len(data.to_parquet()))
        logger.info(f"Data copied : {directory}/{filename}")
    except Exception as e:
        logger.error(f"Error copying to ADLS: {e}")

## Parent pipeline

In [9]:
#Copy customer data
def copy_customer_data():
    customer_count = get_customer_count()
    if customer_count > 500:
        try:
            conn = pyodbc.connect(sql_connection_string)
            query = f"SELECT * FROM {customer_table}"
            customer_data = pd.read_sql(query, conn)
            conn.close()
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            filename = f"customer_{timestamp}.parquet"
            copy_to_adls(customer_data, adls_filesystem, customer_output_path, filename)
            logger.info("Customer data copied successfully")
            # Call child pipeline logic, passing customer count
            copy_product_data(customer_count)
        except Exception as e:
            logger.error(f"Error copying customer data: {e}")
    else:
        logger.info(f"Customer count ({customer_count}) <= 500, skipping copy")

## Child pipeline

In [10]:
# Copy product data
def copy_product_data(customer_count):
    if customer_count > 600:
        try:
            conn = pyodbc.connect(sql_connection_string)
            query = f"SELECT * FROM {product_table}"
            product_data = pd.read_sql(query, conn)
            conn.close()
            timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
            filename = f"product_{timestamp}.parquet"
            copy_to_adls(product_data, adls_filesystem, product_output_path, filename)
            logger.info("Product data copied successfully")
        except Exception as e:
            logger.error(f"Error copying product data: {e}")
    else:
        logger.info(f"Customer count ({customer_count}) <= 600, skipping product copy")

## Fetching & Copying data in DB

In [11]:

def job():
    logger.info("Starting country data fetch")
    for country in countries:
        fetch_country_data(country)
    logger.info("Completed country data fetch")
    
    logger.info("Starting database to ADLS copy")
    copy_customer_data()
    logger.info("Completed database to ADLS copy")

# Main

In [None]:
def main():
    try:
        
        logger.info("Output")
        job()
        
        # Format: 12:00 AM and 12:00 PM IST
        ist = pytz.timezone('Asia/Kolkata')
        schedule.every().day.at("00:00").do(job).tag('data-job').timezone = ist
        schedule.every().day.at("12:00").do(job).tag('data-job').timezone = ist
        logger.info("The Scheduler started. Waiting for scheduled times (12:00 AM and 12:00 PM IST)...")
        
        while True:
            schedule.run_pending()
            time.sleep(60)
    except Exception as e:
        logger.error(f"Error in main: {e}")
        raise

if __name__ == "__main__":
    main()