In [27]:
import json
from fastapi import HTTPException
from pydantic import BaseModel
from typing import List
from pydantic import ValidationError

In [28]:
# 1. create the data model
class ProductionData(BaseModel):
    timestamp: str
    temperature: float
    pressure: float
    class Config:
        extra = 'forbid'

def process_data(file_path):
    try:
        # 2. loads the file in json
        with open(file_path, 'r') as f:
            json_data = json.load(f)  
            
        # 3. validates the data
        validated_data = [ProductionData(**item) for item in json_data]
        # 4. calculates the average value
        total_temperature = sum(item.temperature for item in validated_data)
        total_pressure = sum(item.pressure for item in validated_data)
        
        avg_temperature = total_temperature / len(validated_data)
        avg_pressure = total_pressure / len(validated_data)
        return {
        "average_temperature": avg_temperature,
        "average_pressure": avg_pressure
        }
    
    except ValidationError as e:
        print("Validation Error:")
        print(e.json())  
        return None
    except FileNotFoundError as e:
        print("File Not Found Error:")
        return None

#file_path = "/Users/dongsicheng/Documents/Working/Bosch_Code/production_data"
file_path = "/Users/dongsicheng/Documents/Working/Bosch_Code/data_with_wrong_information"
process_data(file_path)

Validation Error:
[{"type":"missing","loc":["pressure"],"msg":"Field required","input":{"timestamp":"2024-11-01T12:00:00","temperature":78.5,"wrong":101.2},"url":"https://errors.pydantic.dev/2.10/v/missing"},{"type":"extra_forbidden","loc":["wrong"],"msg":"Extra inputs are not permitted","input":101.2,"url":"https://errors.pydantic.dev/2.10/v/extra_forbidden"}]


In [29]:
from fastapi import FastAPI
import uvicorn
import nest_asyncio
from typing import Union
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

In [30]:
# define the fastapi
app = FastAPI()
# metadata for local influx DB
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "orh6W7t9mOhm7AoMZUyAxsbm5bB3vJ2CoEobis485vSo-5bTdNUlOeCpSoMadZqo9PWMN2dQfpte19t1d0rt4w=="
INFLUXDB_ORG = "SichengDong_Work"
INFLUXDB_BUCKET = "Machine_Data"

client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

# transfer the influx table data structure to dict
def influx_query_to_dict(result) -> dict:
    avg_temperature = 0
    avg_pressure = 0
    for table in result:
        for record in table.records:
            if record.get_field() == "temperature":
                avg_temperature = record.get_value()
            elif record.get_field() == "pressure":
                avg_pressure = record.get_value()
    return {"average_temperature": avg_temperature, "average_pressure": avg_pressure}

# build the api
@app.get("/")
async def welcome_user():
    return {"Welcome to the Online Data Processing Platform !"}

@app.get("/status")
def get_status():
    # 1. build the query for influx DB
    query = f'''
        from(bucket: "{INFLUXDB_BUCKET}")
        |> range(start: -30d)
        |> filter(fn: (r) => r._measurement == "Machine_data")
        |> filter(fn: (r) => r._field == "temperature" or r._field == "pressure")
        |> group(columns: ["_field"])
        |> mean()
    '''
    try:
        # 2. get the Influx table data
        result = query_api.query(query=query)
        
        if not result:
            raise HTTPException(status_code=404, detail="No data available.")
        # 3. transfer it to dict value
        data = influx_query_to_dict(result)
        return data
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/data")
def add_data(new_data: Union[List[ProductionData], ProductionData]):
    points = []
    
    if isinstance(new_data, list):
        for item in new_data:
            point = (
                Point("Machine_data")
                .time(item.timestamp) 
                .field("temperature", item.temperature)
                .field("pressure", item.pressure)
            )
            points.append(point)
    else:
        point = (
            Point("Machine_data")
            .time(new_data.timestamp)  
            .field("temperature", new_data.temperature)
            .field("pressure", new_data.pressure)
        )
        points.append(point)

    write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points)
    
    return {"message": "Data added successfully"}

# specific setting for jupyter notebook
nest_asyncio.apply()

def run():
    uvicorn.run(app, host="127.0.0.1", port=8000)

# run the service
run()

INFO:     Started server process [21988]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:53083 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:53083 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:53083 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:53084 - "GET /status HTTP/1.1" 500 Internal Server Error
INFO:     127.0.0.1:53090 - "POST /data HTTP/1.1" 200 OK
INFO:     127.0.0.1:53091 - "GET /status HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [21988]
