-
Notifications
You must be signed in to change notification settings - Fork 2
/
transform_function.py
61 lines (46 loc) · 2.4 KB
/
transform_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import json
def kelvin_to_fahrenheit(temp_in_kelvin):
temp_in_fahrenheit = (temp_in_kelvin - 273.15) * (9/5) + 32
return temp_in_fahrenheit
# transform function
def transform_load_data(task_instance):
data = task_instance.xcom_pull(task_ids="extract_weather_data")
city = data["name"]
weather_description = data["weather"][0]["description"]
temp_farenheit = kelvin_to_farenheit(data["main"]["temp"])
feels_like_farenheit = kelvin_to_farenheit(data["main"]["feels_like"])
min_temp_farenheit = kelvin_to_farenheit(data["main"]["temp_min"])
max_temp_farenheit = kelvin_to_farenheit(data["main"]["temp_max"])
pressure = data["main"]["pressure"]
humidity = data["main"]["humidity"]
wind_speed = data["wind"]["speed"]
time_of_record = datetime.utcfromtimestamp(data["dt"] + data["timezone"])
sunrise_time = datetime.utcfromtimestamp(data["sys"]["sunrise"] + data["timezone"])
sunset_time = datetime.utcfromtimestamp(data["sys"]["sunset"] + data["timezone"])
transformed_data = {"City": city,
"Description": weather_description,
"Temperature (F)": temp_farenheit,
"Feels Like (F)": feels_like_farenheit,
"Minimun Temp (F)":min_temp_farenheit,
"Maximum Temp (F)": max_temp_farenheit,
"Pressure": pressure,
"Humidty": humidity,
"Wind Speed": wind_speed,
"Time of Record": time_of_record,
"Sunrise (Local Time)":sunrise_time,
"Sunset (Local Time)": sunset_time
}
transformed_data_list = [transformed_data]
df_weather = pd.DataFrame(transformed_data_list)
aws_credentials = {"key": <access key>, "secret": <secret key>,
"token": <session token>}
now = datetime.now()
dt_string = now.strftime("%d%m%Y%H%M%S")
dt_string = 'current_weather_data_stockholm' + dt_string
df_weather.to_csv(f"s3://<s3 bucket name>/{dt_string}.csv", index=False)