In [179]:

import pandas as pd
def from_excel_to_csv(filePath):
    df = pd.read_excel(f"{filePath}.xlsx")
    df.to_csv(f"{filePath}.csv", index=False)
    return 'csv'

In [180]:

def read_file_by_ending(filePath, endingType):
    readers = {
        'csv': pd.read_csv,
        'xlsx': pd.read_excel,
        'json': pd.read_json,
        'parquet': pd.read_parquet,
    }

    if endingType not in readers:
        raise ValueError(f"Unsupported file type: {endingType}")

    df = readers[endingType](f"{filePath}.{endingType}")
    return df



In [181]:

def clear_non_valid_dates(df):
    df['timestamp']  = pd.to_datetime(df['timestamp'], format='%Y-%m-%d %H:%M:%S', errors='coerce')
    df_valid_dates = df[df['timestamp'].notna()]
    return df_valid_dates



In [182]:
def clear_duplicated_dates(df):
    df_grouped = df.groupby('timestamp', as_index=False)['value'].sum()
    return df_grouped
        
    


In [183]:

def remove_non_numeric_values(df):
    df['value'] = pd.to_numeric(df['value'], errors='coerce')
    df = df.dropna(subset=['value'])
    return df

In [210]:
def average_for_hour(df, path, endingType):
    df['start_time'] = df['timestamp'].dt.floor('H')  
    hour_average = df[['start_time', 'value']].groupby('start_time').mean()
#     writers = {
#         'csv': lambda df, path: df.to_csv(path, index=True),
#         'xlsx': lambda df, path: df.to_excel(path, index=False),
#         'json': lambda df, path: df.to_json(path, orient='records', lines=True),
#         'parquet': lambda df, path: df.to_parquet(path, index=False),
#     }
#     if endingType not in writers:
#         raise ValueError(f"Unsupported file type: {endingType}")

    # Create the file path and save the result
#     file_path = f"{path}/hour_average.{endingType}"
#     writers[endingType](hour_average, file_path)
    hour_average.to_csv(f"{path}/hour_average.csv")
# def average_for_hour(df, path, endingType):
#     # Calculate the hourly average
#     df['start_time'] = df['timestamp'].dt.floor('H')  
#     hour_average = df[['start_time', 'value']].groupby('start_time').mean()

#     # Save the result based on the file type (endingType)
#     writers = {
#         'csv': lambda df, path: df.to_csv(path, index=True),
#         'xlsx': lambda df, path: df.to_excel(path, index=False),
#         'json': lambda df, path: df.to_json(path, orient='records', lines=True),
#         'parquet': lambda df, path: df.to_parquet(path, index=False),
#     }

#     # Check if the file type is valid
#     if endingType not in writers:
#         raise ValueError(f"Unsupported file type: {endingType}")

#     # Create the file path and save the result
#     file_path = f"{path}/hour_average.{endingType}"
#     writers[endingType](hour_average, file_path)
#     print(f"Saved the average hourly data to {file_path}")

In [185]:
def group_df_by_days(df):
    df['hour'] = df['timestamp'].dt.hour
    df['date'] = df['timestamp'].dt.date
    rows_for_day = df[['timestamp','date', 'hour','value']].groupby(['date']).count()
    df_sorted = df.sort_values('timestamp') 
    return df_sorted[['timestamp', 'value']], rows_for_day['hour']


In [186]:
def split_df_to_files_byDays(df, rows_range, path, endingType):

    writers = {
        'csv': lambda df, path: df.to_csv(path, index=True),
        'xlsx': lambda df, path: df.to_excel(path, index=False),
        'json': lambda df, path: df.to_json(path, orient='records', lines=True),
        'parquet': lambda df, path: df.to_parquet(path, index=False),
    }

    if endingType not in writers:
        raise ValueError(f"Unsupported file type: {endingType}")

    index = 0;
    for j in range(rows_range.shape[0]):
        df_to_file = df[index:index+rows_range[j]]
        index += rows_range[j]
        file_name = f"{path}/time_series{j + 1}.{endingType}"
        writers[endingType](df_to_file, file_name) 


In [188]:

import time
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from datetime import datetime


def read_files(readingPath, index , endingType):
    readers = {
        'csv': pd.read_csv,
        'xlsx': pd.read_excel,
        'json': pd.read_json,
        'parquet': pd.read_parquet,
    }

    if endingType not in readers:
        raise ValueError(f"Unsupported file type: {endingType}")

    df_form_file = readers[endingType](f"{readingPath}{index+1}.{endingType}")
    df_form_file['timestamp'] = pd.to_datetime(df_form_file['timestamp'], errors='coerce')
    df_form_file['start_time'] = df_form_file['timestamp'].dt.floor('H')

    file_result = df_form_file[['start_time','value']].groupby(['start_time']).mean()
    return file_result

        

In [189]:
import time
from concurrent.futures import ThreadPoolExecutor
def read_from_files_and_union_the_averages(path , endingType):
    union_df = pd.DataFrame()
    with ThreadPoolExecutor() as executor:
        dfs = [executor.submit(read_files, f"{path}/time_series", index  , endingType) for index in range(30)]

    end = time.perf_counter()
    df_list = [future.result() for future in dfs]
    union_df = pd.concat(df_list, ignore_index=False)
    return union_df


In [200]:
def save_result_in_file(df,filePath):
#     writers = {
#         'csv': lambda df, path: df.to_csv(path, index=True),
#         'xlsx': lambda df, path: df.to_excel(path, index=False),
#         'json': lambda df, path: df.to_json(path, orient='records', lines=True),
#         'parquet': lambda df, path: df.to_parquet(path, index=False),
#     }

#     if endingType not in writers:
#         raise ValueError(f"Unsupported file type: {endingType}")
    df.to_csv(f"{filePath}/filesResult.csv")
#     writers[endingType](df, path) 

In [211]:

def from_timestamp_file_hour_value(filePath , writingPath , endingType):
    if(endingType=='xlsx'):
        endingType = from_excel_to_csv(filePath)
    df = read_file_by_ending(filePath, endingType)
    df_valid_dates = clear_non_valid_dates(df) 
    df_clear_dup_dated = clear_duplicated_dates(df_valid_dates)
    df_valid_values = remove_non_numeric_values(df_clear_dup_dated)
    average_for_hour(df_valid_values, writingPath , endingType)
    df_sorted, rows_for_day = group_df_by_days(df_valid_values)
    split_df_to_files_byDays(df_sorted, rows_for_day, writingPath, endingType)
    union_df = read_from_files_and_union_the_averages(writingPath , endingType)
    save_result_in_file(union_df , writingPath)
    


In [213]:
from_timestamp_file_hour_value( "./files/time_series","./files/time_serieses", 'xlsx')
from_timestamp_file_hour_value( "./files/time_series","./files/time_serieses", 'parquet')


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_time'] = df['timestamp'].dt.floor('H')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['hour'] = df['timestamp'].dt.hour
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['date'] = df['timestamp'].dt.date
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_time'] = df['timestamp'].dt.floor('H')  # Round down to the start of the hour
