 Data engineering is a field that focuses on designing, constructing, and maintaining data pipelines and systems to efficiently collect, process, store, and analyze large volumes of data. It involves working with various technologies and tools to ensure the reliability, scalability, and performance of data infrastructure.

Data Ingestion:

Data ingestion involves acquiring data from various sources and transferring it into a storage system for further processing. Common data sources include databases, APIs, files, streams, and web scraping.

In [3]:
# Example Data Ingestion
import pandas as pd

# Read CSV file into DataFrame
df = pd.read_csv('Data/car_prices.csv')

Data Processing:

Data processing involves transforming and cleaning raw data to make it usable for analysis. This may include tasks such as data cleaning, normalization, aggregation, and enrichment.

In [5]:
# Remove missing values
df_cleaned = df.dropna()



Data Storage:

Data storage involves storing processed data in a structured manner to enable efficient retrieval and analysis. Common storage solutions include relational databases, NoSQL databases, data lakes, and distributed file systems.

In [7]:
!pip install sqlalchemy



In [8]:
pip install mysql-connector-python

Note: you may need to restart the kernel to use updated packages.


In [9]:
import mysql.connector

def create_database(host, username, password, database_name):
    try:
        # Connect to MySQL server
        connection = mysql.connector.connect(
            host=host,
            user=username,
            password=password
        )

        # Create a cursor object
        cursor = connection.cursor()

        # Execute SQL query to create database
        cursor.execute(f"CREATE DATABASE {database_name}")

        print(f"Database '{database_name}' created successfully.")

    except mysql.connector.Error as error:
        print("Error creating database:", error)

    finally:
        # Close the cursor and connection
        if 'connection' in locals():
            cursor.close()
            connection.close()

# MySQL server connection parameters
host = 'localhost'  # or your MySQL server IP address
username = 'root'
password = 'desmondx'
database_name = 'car_price'

# Call the function to create the database
create_database(host, username, password, database_name)


Database 'car_price' created successfully.


In [12]:
pip install mysqlclient

Collecting mysqlclient
  Obtaining dependency information for mysqlclient from https://files.pythonhosted.org/packages/64/0c/338ff73f627db62f2c321bcda61eddb80a384933aa543e3f59821b3c9756/mysqlclient-2.2.4-cp311-cp311-win_amd64.whl.metadata
  Downloading mysqlclient-2.2.4-cp311-cp311-win_amd64.whl.metadata (4.6 kB)
Downloading mysqlclient-2.2.4-cp311-cp311-win_amd64.whl (203 kB)
   ---------------------------------------- 0.0/203.2 kB ? eta -:--:--
   -- ------------------------------------- 10.2/203.2 kB ? eta -:--:--
   -- ------------------------------------- 10.2/203.2 kB ? eta -:--:--
   ----- --------------------------------- 30.7/203.2 kB 262.6 kB/s eta 0:00:01
   ------- ------------------------------- 41.0/203.2 kB 217.9 kB/s eta 0:00:01
   ----------------- --------------------- 92.2/203.2 kB 403.5 kB/s eta 0:00:01
   ---------------------- --------------- 122.9/203.2 kB 423.5 kB/s eta 0:00:01
   -------------------------------------- 203.2/203.2 kB 588.3 kB/s eta 0:00:00
Insta

In [13]:
from sqlalchemy import create_engine

# Create a connection to MySQL database
engine = create_engine('mysql://root:desmondx@localhost/car_price')

# Store DataFrame in MySQL database
df_cleaned.to_sql('table_name', engine, if_exists='replace', index=False)

472325

Data Pipeline Orchestration:

Data pipeline orchestration involves scheduling and managing the execution of data workflows. This ensures that data processing tasks are executed in the correct order and on time.
Example Python code for scheduling data pipeline using Apache Airflow:

In [14]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def data_processing():
    # Your data processing code here
    pass

dag = DAG('data_pipeline', description='Data pipeline', schedule_interval='@daily', start_date=datetime(2024, 1, 1))

data_processing_task = PythonOperator(task_id='data_processing', python_callable=data_processing, dag=dag)


Data Quality and Monitoring:

Data quality and monitoring involve ensuring the accuracy, completeness, and consistency of data throughout the data pipeline. This includes implementing data validation checks and monitoring data pipelines for errors and anomalies.

In [15]:
# Check for duplicate rows
if df.duplicated().any():
    print('Duplicate rows found!')

# Check for missing values
if df.isnull().values.any():
    print('Missing values found!')


Missing values found!
