## **part one**

In [None]:
import pandas as pd
import heapq
from collections import Counter
from google.colab import files


In [None]:

uploadedOne = files.upload()

In [None]:
file_path = next(iter(uploadedOne))

In [None]:

total_error_counts = Counter()
chunk_size = 10000

def process_chunk(chunk):
    """ the func processes a chunk from the file and counts thw errors by the type"""
    errors = [line.split(',')[-1].strip() for line in chunk]
    error_counts = Counter(errors)
    total_error_counts.update(error_counts)


with open(file_path, 'r', encoding='utf-8') as file:
    chunk = []
    for i, line in enumerate(file, 1):
        chunk.append(line)

        if i % chunk_size == 0:
            process_chunk(chunk)
            chunk = []

    if chunk:
        process_chunk(chunk)


def print_top_errors(n):
    """the func prints the top n errors"""
    print(f"\nthe {n} common errors in the file")
    top_errors = heapq.nlargest(N, total_error_counts.items(), key=lambda x: x[1])
    for error, count in top_errors:
        print(f"{error}: {count} times")


N = 5
print_top_errors(N)


the 5 common errors in the file
Error: WARN_101: 200098 times
Error: ERR_404: 200094 times
Error: ERR_400: 200069 times
Error: INFO_200: 199931 times
Error: ERR_500: 199808 times


## **סיבוכיות זמן ומקום:**

# סיבוכיות זמן:
 - קריאת הקובץ ופריסת השורות לכל chunk של 10,000 שורות: O(n), כאשר n הוא מספר השורות בקובץ.
 - עיבוד כל chunk והעדכון במילון Counter: O(n).
 - מציאת ה-M השגיאות הנפוצות ביותר עם heapq.nlargest: O(k log M), כאשר k הוא מספר השגיאות הייחודיות ו-M הוא מספר השגיאות הנפוצות ביותר שרוצים להדפיס.

# סיבוכיות מקום:
 - שמירת כל chunk בזיכרון: O(chunk_size).
 - מילון total_error_counts שמכיל את כל השגיאות הייחודיות: O(k).
 - שימוש ב-heapq.nlargest לשמירת רק ה-M השגיאות הנפוצות ביותר: O(M).

# **סיכום:**
 :סיבוכיות הזמן  O(n + k log M) והסיבוכיות מקום: O(k+chunk_size+M)



## **part b**

In [None]:
from google.colab import files
uploaded = files.upload()

Saving time_series.parquet to time_series.parquet


In [None]:
import pandas as pd
import os

In [None]:

file_path = next(iter(uploaded))

if file_path.endswith(".csv"):
    df = pd.read_csv(file_path)
elif file_path.endswith(".parquet"):
    df = pd.read_parquet(file_path)


In [None]:
df.head()

Unnamed: 0,timestamp,value
0,28/06/2025 12:00:52,18.5
1,01/06/2025 04:17:23,46.3
2,10/06/2025 17:02:57,76.0
3,23/06/2025 05:23:22,56.4
4,05/06/2025 07:20:08,67.9


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 2 columns):
 #   Column     Non-Null Count    Dtype 
---  ------     --------------    ----- 
 0   timestamp  1000000 non-null  object
 1   value      900960 non-null   object
dtypes: object(2)
memory usage: 15.3+ MB


In [None]:
def check_invalid_dates(df, date_column="timestamp"):
  """the func checks that the data is in a good format """
  df[date_column] = pd.to_datetime(df[date_column], errors="coerce")
  invalid_dates = df[df[date_column].isna()]
  if not invalid_dates.empty:
    print(invalid_dates)
    df.dropna(subset=[date_column], inplace=True)
    print(df.shape)
  else:
    print("no invalid dates")
  return df


In [None]:
def check_double_data(df):
  """ the func checks that there a no double data"""
  duplicate_rows = df[df.duplicated()]
  if not duplicate_rows.empty:
    print(duplicate_rows)
    df.drop_duplicates(inplace=True)
    print(df.shape)
  else:
    print("no double data")
  return df



In [None]:
def check_nan_or_non_numeric(df, column="value"):
  """the func checks that ther a no nan or not numeric values"""
  nan_values = df[column].isna()
  if nan_values.any():
    print(df[nan_values])
    df.dropna(subset=[column], inplace=True)
    print(df.shape)
  else:
    print("no nan values")
  non_numeric_values = df[~df[column].apply(lambda x: pd.to_numeric(x, errors='coerce')).notna()]
  if not non_numeric_values.empty:
    print(non_numeric_values)
    df.drop(non_numeric_values.index, inplace=True)
    print(df.shape)
  return df




In [None]:
def check_invalidion(df):
  """the func checks that the data is in a good format """
  df = check_invalid_dates(df)
  df = check_double_data(df)
  df = check_nan_or_non_numeric(df)
  return df

In [None]:
df=check_invalidion(df)
df.head()


  df[date_column] = pd.to_datetime(df[date_column], errors="coerce")


no invalid dates
                 timestamp         value
16085  2025-06-12 22:31:23          None
26420  2025-06-05 01:39:21          None
39052  2025-06-10 09:54:06          None
40835  2025-06-05 12:07:30          None
49961  2025-06-03 16:20:32  not_a_number
...                    ...           ...
999300 2025-06-03 11:17:22          None
999429 2025-06-22 20:06:07          None
999478 2025-06-09 11:42:30          None
999508 2025-06-12 12:53:11          None
999536 2025-06-17 13:31:50  not_a_number

[2598 rows x 2 columns]
(997402, 2)
                 timestamp value
22     2025-06-25 23:12:45  None
25     2025-06-15 03:15:48  None
35     2025-06-10 20:49:35  None
42     2025-06-12 08:00:35  None
60     2025-06-22 23:00:33  None
...                    ...   ...
999966 2025-06-24 10:57:02  None
999971 2025-06-07 22:13:57  None
999979 2025-06-29 18:40:51  None
999980 2025-06-10 15:39:18  None
999996 2025-06-26 21:09:47  None

[97088 rows x 2 columns]
(900314, 2)
                 tim

Unnamed: 0,timestamp,value
0,2025-06-28 12:00:52,18.5
1,2025-06-01 04:17:23,46.3
2,2025-06-10 17:02:57,76.0
3,2025-06-23 05:23:22,56.4
4,2025-06-05 07:20:08,67.9


In [None]:

df.shape

(850856, 2)

In [None]:
def calculate_hourly_avg(df, time_column="timestamp", value_column="value"):
    """the func calculates the mean valeu per houer """

    df = df.copy()
    df[time_column] = pd.to_datetime(df[time_column])
    df["hour_start"] = df[time_column].dt.floor("H")

    df[value_column] = pd.to_numeric(df[value_column], errors='coerce')

    hourly_avg = df.groupby("hour_start")[value_column].mean().reset_index()
    hourly_avg.columns = ["זמן התחלה", "ממוצע"]

    return hourly_avg

In [None]:
average_per_day=calculate_hourly_avg(df)
average_per_day.head()

  df["hour_start"] = df[time_column].dt.floor("H")


Unnamed: 0,זמן התחלה,ממוצע
0,2025-06-01 00:00:00,50.562894
1,2025-06-01 01:00:00,49.939803
2,2025-06-01 02:00:00,49.457213
3,2025-06-01 03:00:00,50.181573
4,2025-06-01 04:00:00,48.611496


In [None]:





def split_and_process_time_series(df, output_folder="daily_files", output_final="results/hourly_averages.csv"):
    """Splits time series data into daily chunks, calculates hourly averages per file, and merges results."""

    df=df.copy()
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df["day"] = df["timestamp"].dt.date

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    if not os.path.exists("results"):
        os.makedirs("results")

    hourly_avgs = []

    for day, day_df in df.groupby("day"):
        daily_file = f"{output_folder}/{day}.csv"
        day_df.to_csv(daily_file, index=False)


        small_df = pd.read_csv(daily_file)
        small_df["timestamp"] = pd.to_datetime(small_df["timestamp"])
        hourly_avg_df = calculate_hourly_avg(small_df, time_column="timestamp", value_column="value")

        hourly_avgs.append(hourly_avg_df)


    final_result = pd.concat(hourly_avgs, ignore_index=True)
    final_result.to_csv(output_final, index=False)




split_and_process_time_series(df)


  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_column].dt.floor("H")
  df["hour_start"] = df[time_co

# פתרון לשאלה 3 – איך לחשב ממוצעים שעתיים בזמן אמת כאשר הנתונים מוזרמים בגישת stream:

  לעבד כל רשומה באופן מיידי כאשר היא מתקבלת,
 ולאחסן מידע מצטבר עבור כל שעה.
 עבור כל שעה נשמור את סכום הערכים שהתקבלו עד כה ואת מספר הרשומות.

 נשתמש במבנה נתונים כמו מילון, שבו המפתח הוא שעת ההתחלה (למשל '6:00'),
 והערך הוא סכום הערכים + ספירת רשומות.

 בכל פעם שמתקבלת רשומה:
 1. נחלץ את השעה מתוך ה-timestamp.
 2. נעדכן את הסכום והספירה עבור אותה שעה.
 3. נחשב את הממוצע בריצה לפי: סכום חלקי כמות.

 כך נוכל לקבל בכל רגע את הממוצע העדכני לכל שעה, מבלי להמתין לסיום הזרם כולו.

 דוגמה לרשומה: timestamp = '6:10:00 2025-06-10', value = 12.6
 לאחר עיבוד הרשומה נוסיף את 12.6 לסכום של השעה 6:00 ונגדיל את הספירה באחד.





#   יתרונות Parquet


1.  ביצועי שאילתות בצורה מהירה: ניתן לגשת ישירות לעמודות ספציפיות מבלי לקרוא את כל הקובץ,
   מה שמייעל מאוד ביצועים, במיוחד בקבצים גדולים

2. דחיסת נתונים:
    קובצי Parquet דחוסים באופן אוטומטי, ולכן תופסים פחות מקום בדיסק
    ומאפשרים עיבוד מהיר יותר של נתונים גדולים.

3. תמיכה במבני נתונים מדויקים:
    הפורמט שומר על טיפוסי הנתונים של העמודות (כמו תאריכים או מספרים עשרוניים),
    מה שמונע צורך בהמרות ידניות כפי שלעיתים נדרש ב-CSV.


