# 1. COPY DATA FROM DATABASE

### IMPORTING LIBRARIES

In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from fastavro import writer, parse_schema
from sqlalchemy import create_engine

### CREATING ENGINE

In [2]:
from sqlalchemy import create_engine

# Define the connection string
username = 'poulami'
password = 'your_password'
host = 'localhost'  # or '127.0.0.1'
port = '3306'  # Default MySQL port
dbname = 'master'

# Create the engine
engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')


### TESTING CONNECTION

In [3]:
import pandas as pd

# Test query
df = pd.read_sql('SELECT * FROM client_master', engine)
print(df.head())

  clientno            name addresss1 address2       city  pincode  \
0   C00001    Ivan Bayross        a1       a2     Mumbai   400054   
1   C00002  Mamta Muzumdar        a1       a2     Madras   780001   
2   C00003   Chhaya Bankar        a1       a2     Mumbai   400057   
3   C00004   Ashwini Joshi        a1       a2  Bangalore   560001   
4   C00005   Hansel Colaco        a1       a2     Mumbai   400060   

         state   baldue  
0  Maharashtra  15000.0  
1   Tamil Nadu      0.0  
2  Maharashtra   5000.0  
3    Karnataka      0.0  
4  Maharashtra   2000.0  


### CONVERSION TO CSV

In [4]:
df.to_csv('output.csv', index=False)

### CONVERSION TO PARQUET

In [5]:
table = pa.Table.from_pandas(df)
pq.write_table(table, 'output.parquet')

### CONVERSION TO AVRO

In [6]:
def get_avro_type(dtype):
    if pd.api.types.is_integer_dtype(dtype):
        return 'int'
    elif pd.api.types.is_float_dtype(dtype):
        return 'float'
    elif pd.api.types.is_bool_dtype(dtype):
        return 'boolean'
    elif pd.api.types.is_string_dtype(dtype):
        return 'string'
    elif pd.api.types.is_datetime64_any_dtype(dtype):
        return {'type': 'long', 'logicalType': 'timestamp-micros'}
    else:
        return 'string'

schema = {
    'doc': 'Schema for your_table',
    'name': 'your_table',
    'namespace': 'namespace',
    'type': 'record',
    'fields': [{'name': col, 'type': get_avro_type(dtype)} for col, dtype in zip(df.columns, df.dtypes)]
}
records = df.to_dict(orient='records')

with open('output.avro', 'wb') as out:
    writer(out, parse_schema(schema), records)

# 2. CONFIGURE SCHEDULE TRIGGERS, EVENT TRIGGERS

### DIRECTED ACYCLC GRAPH

In [9]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def export_data():
    # Define the connection string
    username = 'poulami'
    password = 'your_password'
    host = 'localhost'  # or '127.0.0.1'
    port = '3306'  # Default MySQL port
    dbname = 'master'
    
    # Create the engine
    engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
    df = pd.read_sql('SELECT * FROM client_master', engine)
    df.to_csv('output.csv', index=False)
    table = pa.Table.from_pandas(df)
    pq.write_table(table, 'output.parquet')

    schema = {
    'doc': 'Schema for your_table',
    'name': 'your_table',
    'namespace': 'namespace',
    'type': 'record',
    'fields': [{'name': col, 'type': get_avro_type(dtype)} for col, dtype in zip(df.columns, df.dtypes)]
    }
    records = df.to_dict(orient='records')

    with open('output.avro', 'wb') as out:
        writer(out, parse_schema(schema), records)
    
    

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'export_data_pipeline',
    default_args=default_args,
    description='A simple data export pipeline',
    schedule_interval=timedelta(days=1),
)

export_task = PythonOperator(
    task_id='export_data',
    python_callable=export_data,
    dag=dag,
)

# 3. COPY ALL TABLES TO ANOTHER DATABASE

In [11]:

# Source and destination database connections
source_engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
dest_db = 'new_master'
dest_engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dest_db}')

# Get list of tables
tables = source_engine.table_names()

# Copy each table
for table in tables:
    df = pd.read_sql_table(table, source_engine)
    df.to_sql(table, dest_engine, index=False, if_exists='replace')


# 4. COPY SELECTED COLUMNS FROM SELECTED TABLES TO NEW DATABASE

In [13]:
# Source and destination database connections
source_engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
dest_db = 'new_sel_master'
dest_engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dest_db}')

# Define tables and columns to copy
tables_and_columns = {
    'client_master': ['clientno', 'name'],
    'sales_order': ['orderno', 'clientno']
}

# Copy each specified table and columns
for table, columns in tables_and_columns.items():
    query = f"SELECT {', '.join(columns)} FROM {table}"
    df = pd.read_sql(query, source_engine)
    df.to_sql(table, dest_engine, index=False, if_exists='replace')