# Data Engineering
Designing, constructing and maintaining of data pipelines. 

## Data Ingestion
- Acquiring data form different sources and transfer them into a storage
- API, Databases, files, Streams, Webscrapping...


In [1]:
# ingesting data from csv file 
import pandas as pd
df = pd.read_csv('data/car_prices.csv')

In [2]:
df.head()

Unnamed: 0,year,make,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate
0,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg566472,ca,5.0,16639.0,white,black,kia motors america inc,20500.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
1,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg561319,ca,5.0,9393.0,white,beige,kia motors america inc,20800.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
2,2014,BMW,3 Series,328i SULEV,Sedan,automatic,wba3c1c51ek116351,ca,45.0,1331.0,gray,black,financial services remarketing (lease),31900.0,30000.0,Thu Jan 15 2015 04:30:00 GMT-0800 (PST)
3,2015,Volvo,S60,T5,Sedan,automatic,yv1612tb4f1310987,ca,41.0,14282.0,white,black,volvo na rep/world omni,27500.0,27750.0,Thu Jan 29 2015 04:30:00 GMT-0800 (PST)
4,2014,BMW,6 Series Gran Coupe,650i,Sedan,automatic,wba6b2c57ed129731,ca,43.0,2641.0,gray,black,financial services remarketing (lease),66000.0,67000.0,Thu Dec 18 2014 12:30:00 GMT-0800 (PST)


## Data processing

- Involves Transforming the data, Cleaning the data
    - Data Cleaning
    - Normalization
    - Aggregation
    - Data Enrichment - Augumentations...

In [3]:
# Remove missing values
df_cleaned = df.dropna()

In [4]:
df_cleaned

Unnamed: 0,year,make,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate
0,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg566472,ca,5.0,16639.0,white,black,kia motors america inc,20500.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
1,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg561319,ca,5.0,9393.0,white,beige,kia motors america inc,20800.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST)
2,2014,BMW,3 Series,328i SULEV,Sedan,automatic,wba3c1c51ek116351,ca,45.0,1331.0,gray,black,financial services remarketing (lease),31900.0,30000.0,Thu Jan 15 2015 04:30:00 GMT-0800 (PST)
3,2015,Volvo,S60,T5,Sedan,automatic,yv1612tb4f1310987,ca,41.0,14282.0,white,black,volvo na rep/world omni,27500.0,27750.0,Thu Jan 29 2015 04:30:00 GMT-0800 (PST)
4,2014,BMW,6 Series Gran Coupe,650i,Sedan,automatic,wba6b2c57ed129731,ca,43.0,2641.0,gray,black,financial services remarketing (lease),66000.0,67000.0,Thu Dec 18 2014 12:30:00 GMT-0800 (PST)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
558831,2011,BMW,5 Series,528i,Sedan,automatic,wbafr1c53bc744672,fl,39.0,66403.0,white,brown,lauderdale imports ltd bmw pembrok pines,20300.0,22800.0,Tue Jul 07 2015 06:15:00 GMT-0700 (PDT)
558833,2012,Ram,2500,Power Wagon,Crew Cab,automatic,3c6td5et6cg112407,wa,5.0,54393.0,white,black,i -5 uhlmann rv,30200.0,30800.0,Wed Jul 08 2015 09:30:00 GMT-0700 (PDT)
558834,2012,BMW,X5,xDrive35d,SUV,automatic,5uxzw0c58cl668465,ca,48.0,50561.0,black,black,financial services remarketing (lease),29800.0,34000.0,Wed Jul 08 2015 09:30:00 GMT-0700 (PDT)
558835,2015,Nissan,Altima,2.5 S,sedan,automatic,1n4al3ap0fc216050,ga,38.0,16658.0,white,black,enterprise vehicle exchange / tra / rental / t...,15100.0,11100.0,Thu Jul 09 2015 06:45:00 GMT-0700 (PDT)


## Data Storage
- It involves storing the data in an efficient manner so that we can retrieve the data for analysis

- Common Storage solutions 
    - Database - MySQL
    - NoSQL Database
    - Data lakes
    - Distriuted File systems
    

In [5]:
! pip install sqlalchemy



In [6]:
!pip install mysql-connector-python



In [7]:
!pip install mysqlclient



In [8]:
# Create a database
import mysql.connector

# Create a function to create a database
def create_database(host, username, password, database_name):
    try:
        #connect to mySQL server
        connection = mysql.connector.connect(
            host = host,
            user = username,
            password = password
        )
        
        # Create a cursor object
        cursor = connection.cursor()
        
        # Execute an sql query to create a database
        cursor.execute(f"CREATE DATABASE {database_name}")
        
        print(f"Database '{database_name}' created successfully")
        
    except mysql.connector.Error as error:
        print("Error creating database:", error)
        
    finally:
        # Cloose the cursor and the connection
        if 'connection' in locals():
            cursor.close()
            connection.close()
            
# MySQL Server connection Parameters
host = 'localhost' # Change as your IP Address ..
username = 'root'
password = 'meekmilly'
database_name = 'car_prices'

# Call the function 
create_database(host, username, password, database_name)
        

Error creating database: 1007 (HY000): Can't create database 'car_prices'; database exists


In [9]:
from sqlalchemy import create_engine

# Create a connection to mysql database
engine = create_engine('mysql://root:meekmilly@localhost/car_prices')

# Store DataFrame in MySQL database
df_cleaned.to_sql('table_name', engine, if_exists='replace', index=False)

## Data Pipeline Orchestration 
- Scheduling and managing the execution od data workflows


In [None]:

pip install "apache-airflow[celery]==2.8.1" 

In [11]:
# Apache Airflow for data scheduling
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def data_processing():
    # your data processing code here
    pass

dag = DAG('data_pipeline', description='Data pipeline', schedule_interval = '@daily', start_date = datetime(2024,1, 1))

data_processing_task = PythonOperator(task_id = 'data_processing', python_callable=data_processing, dag = dag)



AirflowConfigException: Cannot use relative path: `sqlite:///C:\Users\gomez/airflow/airflow.db` to connect to sqlite. Please use absolute path such as `sqlite:////tmp/airflow.db`.

In [None]:
pip show sqlalchemy