In [5]:
# Import the necessary modules
import json
import requests
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from rich import print as rprint
import pandas as pd
from decimal import Decimal

In [23]:
genesis_url = 'https://www-genesis.destatis.de/genesisWS/rest/2020/data/cube?'
user_id = 'DE17T29R57'
password = '4Bf/3Ap)3]r2,,h'

# Wärmepreisindex
cubecode = '61111BM006'
areatype = 'all'
category = 'all'
content = ''
start_year = ''
classifier1 = 'CC13B1'
key1 = 'CC13-77'
lang = 'en'

# Destatis Base Url
url = (f"{genesis_url}username={user_id}&password={password}&language=de&name={cubecode}&area={areatype}"
       f"&compress=true&contents={content}&startyear={start_year}")

# Add Classifyers to url
if classifier1:
       url += f'&classifyingvariable1={classifier1}'    
if key1:
       url += f'&classifyingkey1={key1}'

# request 
response = requests.get(url)

# First, parse the JSON response with Decimal conversion for floats
#Float types are not supported with dynamodb; use Decimal types instead
parsed_json = json.loads(response.text, parse_float=Decimal)

# Now, access the 'Content' part and split it by newline
data = parsed_json["Object"]['Content'].split('\n')

# Filter out the relevant lines
filtered_response = [line for line in data if 'D' in line and 'e' in line and ('MONAT' in line or 'QUART' in line)]

# Split each line into its components and extract the relevant information
data = [{'field_D': parts[0],
         'field_DG': parts[1],
         'classifyingkey1': parts[2],
         'period': parts[3],
         'year': int(parts[4]),
         'value': float(parts[5]),
         'field_e': parts[6]
        }
        for parts in (line.split(';') for line in filtered_response)]

# Create a DataFrame from the extracted data
df = pd.DataFrame(data)

# Determine if the data is monthly or quarterly based on the unique values in the 'period' column
if df['period'].str.contains('Q').any():
    period_key = 'quarter'
else:
    period_key = 'month'

# Create result dict
# Create the results dict and convert float to Decimal
result = {'cubeCode': cubecode, 'content': content, 'classifyingVar1': classifier1, 'classifyingKey1': key1, 'data': []}
for year, group in df.groupby('year'):
    year_data = {
        'year': year,
        'df': [{period_key: row['period'], 'value': (row['value'])} for _, row in group.iterrows()]
    }
    result['data'].append(year_data)
    
j = json.dumps(result, indent=4)
rprint(j)

In [44]:
# DynamoDB Configurations
table_name = 'onetable'
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
table = dynamodb.Table(table_name)

In [24]:
# Function to upload data to DynamoDB
def upload_to_dynamodb(table, data_dict):
    try:
        for year_info in data_dict['data']:  # Iterate over each year
            year = year_info['year']
            for month_data in year_info['df']:  # Iterate over each month's data
                index = "index"
                sort_key = f"IND#{data_dict['cubeCode']}#{year}"
                
                # Prepare the item dictionary for DynamoDB
                item = {
                    'pk': index,
                    'sk': sort_key,
                    'cubeCode': data_dict['cubeCode'],
                    'classifyingVar1': data_dict['classifyingVar1'],
                    'classifyingKey1': data_dict['classifyingKey1'],
                    'year': year,
                    'month': month_data[period_key],  # Use period_key for the month column name
                    'value': Decimal(str(month_data['value']))  # Convert value to Decimal
                }
                
                # Put the item into DynamoDB
                table.put_item(Item=item)
                print(f"Uploaded item: {item}")

    except Exception as e:
        print(f"An error occurred: {e}")

# Assuming 'result' is your previously defined dictionary with data
upload_to_dynamodb(table, result)

Uploaded item: {'pk': 'index', 'sk': 'IND#61111BM006#1991', 'cubeCode': '61111BM006', 'classifyingVar1': 'CC13B1', 'classifyingKey1': 'CC13-77', 'year': 1991, 'month': 'MONAT01', 'value': Decimal('38.7')}
Uploaded item: {'pk': 'index', 'sk': 'IND#61111BM006#1991', 'cubeCode': '61111BM006', 'classifyingVar1': 'CC13B1', 'classifyingKey1': 'CC13-77', 'year': 1991, 'month': 'MONAT02', 'value': Decimal('38.9')}
Uploaded item: {'pk': 'index', 'sk': 'IND#61111BM006#1991', 'cubeCode': '61111BM006', 'classifyingVar1': 'CC13B1', 'classifyingKey1': 'CC13-77', 'year': 1991, 'month': 'MONAT03', 'value': Decimal('39.6')}
Uploaded item: {'pk': 'index', 'sk': 'IND#61111BM006#1991', 'cubeCode': '61111BM006', 'classifyingVar1': 'CC13B1', 'classifyingKey1': 'CC13-77', 'year': 1991, 'month': 'MONAT04', 'value': Decimal('39.8')}
Uploaded item: {'pk': 'index', 'sk': 'IND#61111BM006#1991', 'cubeCode': '61111BM006', 'classifyingVar1': 'CC13B1', 'classifyingKey1': 'CC13-77', 'year': 1991, 'month': 'MONAT05', '

In [47]:
# use PartiQL to query the table
# this solution avoids a potential SQL injection problem

# Initialize the DynamoDB service resource
dynamodb = boto3.client('dynamodb', region_name='eu-west-1') 

pk = 'index'
sk = 'IND#61111BM006#1991'# This should be the value of the Index that corresponds to the data you want

# The year you are interested in
year = "2005"

# Prepare the PartiQL statement to query data
stmt = f"SELECT * FROM {table_name} WHERE pk = ? AND  sk = ?"

# Parameters for the query
pmt = [
    {"S": pk},
    {"S": sk}# For begins_with(SK, 'IND#1991')
]

# Execute the statement
resp = dynamodb.execute_statement(
    Statement=stmt , Parameters= pmt
)

# Print the response items
rprint(resp['Items'])