## Python + SQL data ingestion and analysis

**Python:**

1. Convert the CSV files into Parquet files.
2. Move the Parquet files into Postgres as raw data tables.

**SQL:**

3. Create the tables.
4. Insert data into the tables.
5. Find the average, minimum, and maximum sensor readings for a specific sensor within a given time period.
6. Find the sensors that experienced readings above a certain threshold value.
7. Calculate the hourly, daily, and monthly averages for sensor readings and store the results in appropriate tables.


In [3]:
import matplotlib.pyplot as plt
from sqlalchemy import create_engine
from pathlib import Path
import pandas as pd
import numpy as np
import os

In [4]:
def convert_csvs_to_parquet(dir_path=None):
    """
    Convert all the CSV files in a folder into a Parquet file
    :return:
    """
    if not dir_path:
        raise Exception("Please provide a path to the folder containing the CSV files")

    # Get all csv files in the folder
    files = os.listdir(dir_path)
    csv_files = [os.path.join(dir_path, f) for f in files if f.endswith(".csv")]

    # Read the CSV files into a Pandas DataFrame
    dfs = [pd.read_csv(f) for f in csv_files]
    files_dfs_dict = dict(zip(csv_files, dfs))

    # Create Parquet file for each file
    for f, df in files_dfs_dict.items():
        file_name = Path(f).stem + ".parquet"
        df.to_parquet(os.path.join(dir_path, file_name), engine="pyarrow")


def move_parquet_into_postgre(dir_path: str):
    """
    Move the parquet file into postgres
    :return:
    """
    if not dir_path:
        raise Exception("Please provide a path to the folder containing the CSV files")

        # Get all csv files in the folder
    files = os.listdir(dir_path)
    parquet_files = [os.path.join(dir_path, f) for f in files if f.endswith(".parquet")]    # Read the CSV files into a Pandas DataFrame
    dfs = [pd.read_parquet(f, engine="pyarrow") for f in parquet_files]
    files_dfs_dict = dict(zip(parquet_files, dfs))
    for f_path, df in files_dfs_dict.items():
        df.columns = [c.lower() for c in df.columns]  # postgres doesn't like capitals or spaces
        # Remove date and time columns
        if 'date' in df.columns and 'time' in df.columns:
            df = df.drop(['date', 'time'], axis=1)

        table_name = Path(f_path).stem
        engine = create_engine(f"postgresql://postgres:123123@localhost:5432/sensorsDB")

        df.to_sql(table_name+'_raw', engine)


In [1]:
data_dir = r"C:\Idan\DataYoga\Data"
convert_csvs_to_parquet(dir_path=data_dir)
move_parquet_into_postgre(dir_path=data_dir)

NameError: name 'convert_csvs_to_parquet' is not defined

# 1. Create the tables :

```sql
CREATE TABLE Sensors (
  SensorID INT NOT NULL,
  LocationID INT NOT NULL,
  PRIMARY KEY (SensorID)
);

CREATE TABLE Locations (
  LocationID INT NOT NULL,
  LocationName CHAR(256) NOT NULL,
  PRIMARY KEY (LocationID)
);

CREATE TABLE Readings (
  SensorID INT NOT NULL,
  TimeStamp TIMESTAMP NOT NULL,
  ReadingValue FLOAT NOT NULL,
  PRIMARY KEY (SensorID, TimeStamp),
  FOREIGN KEY (SensorID) REFERENCES Sensors (SensorID)
);
```


# Update sensors table to have a constrain on locationID
```sql
ALTER TABLE Sensors
ADD CONSTRAINT sensors_location_constraint
FOREIGN KEY (LocationID) REFERENCES Locations (LocationID);
```

# Update Readings_raw TIMESTAMP column to be in UTC time zone.
```sql
ALTER TABLE Readings_raw
ALTER COLUMN TimeStamp TYPE TIMESTAMP WITH TIME ZONE USING TimeStamp AT TIME ZONE 'UTC';
```

# Insert data into the tables
```sql
INSERT INTO locations (LocationID, LocationName)
SELECT LocationID, LocationName  
FROM public."Locations_raw";

INSERT INTO sensors (SensorID, LocationID)
SELECT DISTINCT SensorID, LocationID 
FROM public."Sensors_raw";

INSERT INTO readings (SensorID, TimeStamp, ReadingValue)
SELECT SensorID, CAST(TimeStamp AS timestamp with time zone), ReadingValue
FROM public."Readings_raw";
```


# Find the average, minimum, and maximum sensor readings for a specific sensor within a given time period.
```sql
SELECT SensorID,
       MAX(ReadingValue) AS max_reading_value,
       MIN(ReadingValue) AS min_reading_value,
       AVG(ReadingValue) AS average_reading_value
FROM readings
WHERE TimeStamp BETWEEN '2020-01-01 00:00:00' AND '2020-03-31 00:00:00'
GROUP BY SensorID
ORDER BY SensorID ASC;
```

# Find the sensors that experienced readings above a certain threshold value.
```sql
SELECT  DISTINCT SensorID as sensors_above_200
FROM readings
WHERE ReadingValue > 200
ORDER BY SensorID ASC;
```

# Calculate the hourly, daily, and monthly averages for sensor
# readings and store the results in appropriate tables.

```sql

-- Create daily table
CREATE TABLE daily_readings (
  SensorID INT NOT NULL,
  day INT NOT NULL,
  avg_readings FLOAT NOT NULL,
  PRIMARY KEY (SensorID, day)
);

-- Insert data into the table
INSERT INTO daily_readings
SELECT SensorID, 
	   EXTRACT(day from TimeStamp) AS day,
	   AVG(ReadingValue) AS avg_readings
FROM readings
GROUP BY SensorID, EXTRACT(day from TimeStamp);

-- Create hourly table
CREATE TABLE hourly_readings (
  SensorID INT NOT NULL,
  hour INT NOT NULL,
  avg_readings FLOAT NOT NULL,
  PRIMARY KEY (SensorID, hour)
);

-- Insert data into the table
INSERT INTO hourly_readings
SELECT SensorID, 
	   EXTRACT(hour from TimeStamp) AS hour,
	   AVG(ReadingValue) AS avg_readings
FROM readings
GROUP BY SensorID, EXTRACT(hour from TimeStamp);


-- Create monthly table
CREATE TABLE monthly_readings (
  SensorID INT NOT NULL,
  month INT NOT NULL,
  avg_readings FLOAT NOT NULL,
  PRIMARY KEY (SensorID, month)
);

-- Insert data into the table
INSERT INTO monthly_readings
SELECT SensorID, 
	   EXTRACT(month from TimeStamp) AS month,
	   AVG(ReadingValue) AS avg_readings
FROM readings
GROUP BY SensorID, EXTRACT(month from TimeStamp);
```

# Find Trends by moving average of 8 hours
```sql 
CREATE TABLE hourly_moving_averages (
  sensorID INT NOT NULL,
  hour INT NOT NULL,
  avg_readings FLOAT NOT NULL,
  moving_average FLOAT NOT NULL
);

INSERT INTO hourly_moving_averages
SELECT sensorID,
	   hour,
	   avg_readings,
       AVG(avg_readings) OVER (PARTITION BY sensorID
                              ORDER BY sensorID, hour
                              ROWS BETWEEN 8 PRECEDING AND CURRENT ROW) AS moving_average
FROM hourly_readings;


```