In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType

# Create spark session
spark = SparkSession.builder.appName('Processing Data').master('localhost').getOrCreate()

# Create a schema
schema = StructType([ \
 StructField("rate", FloatType(), True) 
])
 
# Load data
df = spark.read.schema(schema).csv('dags/rates.csv')

# Create a temp view
df.createOrReplaceTempView(name='rates')

In [28]:
query = '''

select *
from rates;
'''
result = spark.sql(query)
print(result.show(5))


+----+
|rate|
+----+
|null|
| 0.0|
| 1.0|
| 2.0|
| 3.0|
+----+
only showing top 5 rows

None


In [None]:
import pytest
from dags.python.Helper import extract_rates, extract_rates_dictionary, create_dataframe, load_to_google_storage
import configparser
import pandas as pd

# Configs
config = configparser.ConfigParser()
config.read('/projects/stock_analysis_platform/dags/python/pipeline.conf')
api_key = config.get('fixer_io_api_key', 'api_key')


with open('/projects/stock_analysis_platform/rates_test.txt', 'w') as write_file:
    write_file.write(rates)



In [36]:
import json

with open('/projects/stock_analysis_platform/rates_test.txt', 'r') as read_file:
    results = read_file.read()
    read_file.close()

rates = extract_rates_dictionary(results)

assert '2022-01-01' in rates.keys()


In [55]:
with open('/projects/stock_analysis_platform/rates_test.txt', 'r') as read_file:
        results = read_file.read()
        read_file.close()

rates = extract_rates_dictionary(results)

start_date = "2022-01-01"
end_date = "2022-01-02"

    # Call the function
df = create_dataframe(rates, start_date, end_date, export_to_csv=False)

In [56]:
df

Unnamed: 0,AED,AFN,ALL,AMD,ANG,AOA,ARS,AUD,AWG,AZN,...,XAU,XCD,XDR,XOF,XPF,YER,ZAR,ZMK,ZMW,ZWL
0,4.176782,117.979297,121.277018,546.096265,2.049808,626.101357,116.773434,1.564485,2.046862,1.937662,...,0.000622,3.073193,0.812663,658.979869,120.367257,284.571056,18.138658,10235.677475,18.94874,366.160363
1,4.176782,117.979297,121.277018,546.096265,2.049808,626.101357,116.773434,1.564485,2.046862,1.937662,...,0.000622,3.073193,0.812663,658.979869,120.367257,284.571056,18.138658,10235.677475,18.94874,366.160363
2,4.177235,117.993578,121.292102,546.164583,2.050064,626.17972,116.759289,1.565111,2.047118,1.93185,...,0.000622,3.073577,0.812765,659.057648,120.382047,284.605769,18.156679,10236.956907,18.95111,366.20617


In [44]:
# Create a dataframe with the columns and indices from the first day's data
first_day_data = rates.get(str(start_date))

first_day_df = pd.DataFrame(data=first_day_data, index=[0])
first_day_df

Unnamed: 0,AED,AFN,ALL,AMD,ANG,AOA,ARS,AUD,AWG,AZN,...,XAU,XCD,XDR,XOF,XPF,YER,ZAR,ZMK,ZMW,ZWL
0,4.176782,117.979297,121.277018,546.096265,2.049808,626.101357,116.773434,1.564485,2.046862,1.937662,...,0.000622,3.073193,0.812663,658.979869,120.367257,284.571056,18.138658,10235.677475,18.94874,366.160363


In [46]:
dates = pd.date_range(start=start_date, end=end_date, freq='D')
dates

DatetimeIndex(['2022-01-01', '2022-01-02'], dtype='datetime64[ns]', freq='D')

In [47]:
for date in dates.date:
    data = rates.get(str(date))
    if data is None:
            # Skip the date if there is no data for it
        continue
            # Append the data for the date to the dataframe
    first_day_df = first_day_df.append(data, ignore_index=True)

In [48]:
first_day_df

Unnamed: 0,AED,AFN,ALL,AMD,ANG,AOA,ARS,AUD,AWG,AZN,...,XAU,XCD,XDR,XOF,XPF,YER,ZAR,ZMK,ZMW,ZWL
0,4.176782,117.979297,121.277018,546.096265,2.049808,626.101357,116.773434,1.564485,2.046862,1.937662,...,0.000622,3.073193,0.812663,658.979869,120.367257,284.571056,18.138658,10235.677475,18.94874,366.160363
1,4.176782,117.979297,121.277018,546.096265,2.049808,626.101357,116.773434,1.564485,2.046862,1.937662,...,0.000622,3.073193,0.812663,658.979869,120.367257,284.571056,18.138658,10235.677475,18.94874,366.160363
2,4.177235,117.993578,121.292102,546.164583,2.050064,626.17972,116.759289,1.565111,2.047118,1.93185,...,0.000622,3.073577,0.812765,659.057648,120.382047,284.605769,18.156679,10236.956907,18.95111,366.20617


In [49]:
first_day_df.to_csv('dags/rates.csv')

In [50]:
first_day_df

Unnamed: 0,AED,AFN,ALL,AMD,ANG,AOA,ARS,AUD,AWG,AZN,...,XAU,XCD,XDR,XOF,XPF,YER,ZAR,ZMK,ZMW,ZWL
0,4.176782,117.979297,121.277018,546.096265,2.049808,626.101357,116.773434,1.564485,2.046862,1.937662,...,0.000622,3.073193,0.812663,658.979869,120.367257,284.571056,18.138658,10235.677475,18.94874,366.160363
1,4.176782,117.979297,121.277018,546.096265,2.049808,626.101357,116.773434,1.564485,2.046862,1.937662,...,0.000622,3.073193,0.812663,658.979869,120.367257,284.571056,18.138658,10235.677475,18.94874,366.160363
2,4.177235,117.993578,121.292102,546.164583,2.050064,626.17972,116.759289,1.565111,2.047118,1.93185,...,0.000622,3.073577,0.812765,659.057648,120.382047,284.605769,18.156679,10236.956907,18.95111,366.20617


In [53]:
with open('/projects/stock_analysis_platform/rates_test.txt', 'r') as read_file:
        results = read_file.read()
        read_file.close()

rates = extract_rates_dictionary(results)

start_date = "2022-01-01"
end_date = "2022-01-02"


def create_dataframe(rates: dict, start_date: str, end_date: str, export_to_csv=True) -> pd.DataFrame:

    # Create a dataframe with the columns and indices from the first day's data
    first_day_data = rates.get(str(start_date))
    if first_day_data is None:
        # Return an empty dataframe if there is no data for the start date
        return pd.DataFrame()
    first_day_df = pd.DataFrame(data=first_day_data, index=[0])

    # Iterate over the dates from the start_date to the end_date
    dates = pd.date_range(start=start_date, end=end_date, freq='D')
    for date in dates.date:
        data = rates.get(str(date))
        if data is None:
            # Skip the date if there is no data for it
            continue
            # Append the data for the date to the dataframe
        first_day_df = first_day_df.append(data, ignore_index=True)

    if export_to_csv == True:
        first_day_df.to_csv('dags/rates.csv')
        
    return first_day_df

In [57]:
df = create_dataframe(rates, start_date, end_date, export_to_csv=False)

In [67]:
assert df.iloc[0:1, 0].values == 4.176782