<a href="https://colab.research.google.com/github/eaglebaba/ETL_PROCESS/blob/main/Data_Engineering_from_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import sqlite3

# Define your Python function to run the SQL query and write to SQLite
def run_sql_and_write_to_sqlite():
    conn = sqlite3.connect('Trips.db')
    cursor = conn.cursor()

    sql_query = """
    SELECT
    DATE_FORMAT(pickup_datetime, '%Y-%m') AS Month,
    ROUND(AVG(CASE WHEN DAYOFWEEK(pickup_datetime) = 7 THEN 1 ELSE 0 END), 1) AS sat_mean_trip_count,
    ROUND(AVG(CASE WHEN DAYOFWEEK(pickup_datetime) = 7 THEN fare_amount ELSE 0 END), 1) AS sat_mean_fare_per_trip,
    ROUND(AVG(CASE WHEN DAYOFWEEK(pickup_datetime) = 7 THEN dropoff_datetime-pickup_datetime ELSE 0 END), 1) AS sat_mean_duration_per_trip,
    ROUND(AVG(CASE WHEN DAYOFWEEK(pickup_datetime) = 1 THEN 1 ELSE 0 END), 1) AS sun_mean_trip_count,
    ROUND(AVG(CASE WHEN DAYOFWEEK(pickup_datetime) = 1 THEN fare_amount ELSE 0 END), 1) AS sun_mean_fare_per_trip,
    ROUND(AVG(CASE WHEN DAYOFWEEK(pickup_datetime) = 1 THEN dropoff_datetime-pickup_datetime ELSE 0 END), 1) AS sun_mean_duration_per_trip
FROM tripdata
WHERE pickup_datetime BETWEEN '2014-01-01 00:00:00' AND '2016-12-31 23:59:59'
GROUP BY DATE_FORMAT(pickup_datetime, '%Y-%m')
ORDER BY Month;
"""


    cursor.execute(sql_query)

    # If 'tripdata' table already exists, you may want to delete or clear it before inserting new data
    cursor.execute('DELETE FROM tripdata')

    # Fetch the result of your SQL query and insert it into 'tripdata' table
    result = cursor.fetchall()
    cursor.executemany('INSERT INTO tripdata VALUES (?, ?, ?, ?, ?, ?, ?)', result)

    conn.commit()
    conn.close()

# Define default arguments
default_args = {
    'owner': 'Michael_Adeyeye',
    'start_date': days_ago(1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create a DAG
dag = DAG(
    'sqlite_data_pipeline',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
    tags=['any'],
)

# Task to run the Python function
run_sql_task = PythonOperator(
    task_id='run_sql_and_write_to_sqlite',
    python_callable=run_sql_and_write_to_sqlite,
    dag=dag,
)

run_sql_task  # Define task dependencies


In [None]:
!pip install airflow.providers.sqlite.operators.sqlite_to_sqlite

In [1]:
import sqlite3
# Create or connect to an SQLite database
conn = sqlite3.connect('Trips.db')
# Create a cursor
cursor = conn.cursor()
# Create a table
cursor.execute('''
    CREATE TABLE tripdata (
        Month TEXT,
        sat_mean_trip_count REAL,
        sat_mean_fare_per_trip REAL,
        sat_mean_duration_per_trip REAL,
        sun_mean_trip_count REAL,
        sun_mean_fare_per_trip REAL,
        sun_mean_duration_per_trip REAL
    )
''')
#Commit the changes and close the connection
conn.commit()
conn.close()