In [None]:
import csv

import numpy as np
import pandas as pd
pd.set_option("display.max_columns", None)

# Data Visualiation
import matplotlib.pyplot as plt
import seaborn as sns

# System
import os

# Deep Learning

from typing import Dict, List, Tuple, Sequence


In [None]:
file_path="/content/drive/MyDrive/data_finale_V4.csv"

processed_data = pd.read_csv(file_path, sep=',')
processed_data.columns

In [None]:
processed_data.drop('Unnamed: 0', axis=1, inplace=True)
processed_data['Date'] = pd.to_datetime(processed_data['Date'], format='%Y-%m-%d')
processed_data.dtypes

In [None]:
def get_cells_data(df) ->np.array :
    cells=df[["eNodeB identity",'Cell ID','eNodeB_identifier_int']].sort_values(by='eNodeB_identifier_int')
    cells=cells.drop_duplicates()


    data=[]

    for index, row in cells.iterrows():
        df_cell=df[(df["eNodeB identity"]==row[0]) & (df["Cell ID"]==row[1])]
        df_cell=df_cell.sort_values(by='eNodeB_identifier_int')
        df_cell=df_cell.reset_index(drop=True)
        df_cell_1=df_cell.copy()
        df_cell_1.drop(['Date','eNodeB identity', 'Cell ID', 'eNodeB_identifier_int'], axis=1, inplace=True)
        data.append(df_cell_1)


    cells_data=np.array(data)

    return cells_data

In [None]:
def train_test_split(data_cell:pd.DataFrame,
                     train_test_ratio: float,
                     input_length: int) -> Tuple[pd.DataFrame]:
    """From a fold dataframe, take a train dataframe and test dataframe based on
    the split ratio.
    - df_train should contain all the timesteps until round(train_test_ratio * len(fold))
    - df_test should contain all the timesteps needed to create all (X_test, y_test) tuples

    Args:
        data_cell (pd.DataFrame): chronological data of one cell
        train_test_ratio (float): The ratio between train and test 0-1
        input_length (int): How long each X_i will be : 3 month 90 days

    Returns:
        Tuple[pd.DataFrame]: A tuple of two dataframes (fold_train, fold_test)
    """
    # $CHALLENGIFY_BEGIN

    # TRAIN SET
    # ======================
    last_train_idx = round(train_test_ratio * len(data_cell))

    print (last_train_idx)
    data_cell_train = data_cell.iloc[0:last_train_idx, :]

    # TEST SET
    # ======================
    first_test_idx = last_train_idx - input_length

    data_cell_test = data_cell.iloc[first_test_idx:, :]

    return (data_cell_train, data_cell_test)

    # $CHALLENGIFY_END

In [None]:
def get_Xi_yi(
    data_cell:pd.DataFrame,
    input_length:int,
    output_length:int,
    start_index:int) -> Tuple[pd.DataFrame]:
    """given a fold, it returns one sequence (X_i, y_i) as based on the desired
    input_length and output_length with the starting point of the sequence being chosen at random based

    Args:
        data_cell (pd.DataFrame): data of a single cell
        input_length (int): How long each X_i should be --> 3 months
        output_length (int): How long each y_i should be --> 1 month
        start_index (int) : where to start the X_i

    Returns:
        Tuple[pd.DataFrame]: A tuple of two dataframes (X_i, y_i)
    """

    X_i = data_cell.iloc[start_index:start_index+input_length]


    y_i = data_cell.iloc[start_index+input_length:
                  start_index+input_length+output_length][['Trafic LTE.float']]

    return (X_i, y_i)
    # $CHALLENGIFY_END

In [None]:
def get_X_y(
    cell_data:pd.DataFrame,
    input_length:int,
    output_length:int) -> Tuple[list]:
    """Given a fold generate X and y based on the number of desired sequences
    of the given input_length and output_length

    Args:
        cell_data (pd.DataFrame): cell dataframe
        input_length (int): Length of each X_i
        output_length (int): Length of each y_i

    Returns:
        Tuple[np.array]: A tuple of numpy arrays (X, y)
    """
    # $CHALLENGIFY_BEGIN
    X, y = [], []


    number_of_sequences=len(cell_data) - (input_length + output_length) + 1

    for i in range(number_of_sequences):
        (Xi, yi) = get_Xi_yi(cell_data, input_length, output_length, i)

        X_i=pd.DataFrame(Xi)
        X_i.drop('Trafic LTE.float', axis=1, inplace=True) # delete original trafic column

        X_i = X_i.apply( pd.to_numeric, errors='coerce' )
        yi = yi.apply( pd.to_numeric, errors='coerce' )

        X.append(X_i)
        y.append(yi)
        #np.array(y)
    return X, y
    # $CHALLENGIFY_END

In [None]:
cells_data=get_cells_data(processed_data)

In [None]:
INPUT_LENGTH=90
OUTPUT_LENGTH=30
columns=list(processed_data.columns)
columns.remove('Date')
columns.remove('eNodeB identity')
columns.remove('Cell ID')
columns.remove('eNodeB_identifier_int')

In [None]:
cell_index=0

for cell_data in cells_data :
  cell_i_data=pd.DataFrame(cell_data)
  cell_i_data.columns=columns
  data_cell_i_train, data_cell_i_test=train_test_split(cell_i_data, 0.8, 90) # 90 days
  X_train, y_train = get_X_y(data_cell_i_train, INPUT_LENGTH, OUTPUT_LENGTH)
  X_test, y_test = get_X_y(data_cell_i_test, INPUT_LENGTH, OUTPUT_LENGTH)
  print (cell_index)
  if cell_index==0:
    X_train_all=X_train
    y_train_all=y_train
    X_test_all=X_test
    y_test_all=y_test
  else :
    X_train_all=X_train_all + X_train
    y_train_all=y_train_all + y_train
    X_test_all=X_test_all + X_test
    y_test_all=y_test_all +y_test
    print (len(X_train_all), len(y_train_all), len(X_test_all), len(y_test_all))

  cell_index=cell_index+1
  # the memory of collab is fully used when arriving to 2283 cells
  if cell_index==1000:
    break
X_train_all_array=np.array(X_train_all)
y_train_all_array=np.array(y_train_all)
X_test_all_array=np.array(X_test_all)
y_test_all_array=np.array(y_test_all)

X_train_all_array.shape, y_train_all_array.shape,X_test_all_array.shape, y_test_all_array.shape

In [None]:
# Save X and y into csv files
file_path_X_train="/content/drive/MyDrive/X_train_all.csv"
file_path_y_train="/content/drive/MyDrive/y_train_all.csv"
file_path_X_test="/content/drive/MyDrive/X_test_all.csv"
file_path_y_test="/content/drive/MyDrive/y_test_all.csv"


with open(file_path_X_train, 'w', newline='') as csvfile_X_train:
    writer = csv.writer(csvfile_X_train, delimiter=',')
    writer.writerows(X_train_all_array)


In [None]:
with open(file_path_y_train, 'w', newline='') as csvfile_y_train:
    writer = csv.writer(csvfile_y_train, delimiter=',')
    writer.writerows(y_train_all_array)

In [None]:
with open(file_path_X_test, 'w', newline='') as csvfile_X_test:
    writer = csv.writer(csvfile_X_test, delimiter=',')
    writer.writerows(X_test_all_array)

In [None]:
with open(file_path_y_test, 'w', newline='') as csvfile_y_test:
    writer = csv.writer(csvfile_y_test, delimiter=',')
    writer.writerows(y_test_all_array)