In [1]:
####################################
######## LIBRARIES IMPORT ##########
####################################
import os
from pyspark.sql import SparkSession
import findspark
import databricks.koalas as ks
import pyspark.pandas as ps
import pandas as pd
from pathlib import Path
from IPython.display import display, clear_output


####################################
######## SPARK RUNNING ##########
####################################
os.environ["JAVA_HOME"] = "/usr"
os.environ["SPARK_HOME"] = "/opt/spark"
findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/23 19:21:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/23 19:21:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
####################################
######## LIBRARIES IMPORT ##########
####################################
import os
from pyspark.sql import SparkSession
import findspark
import databricks.koalas as ks
import pyspark.pandas as ps
import pandas as pd
from pathlib import Path
from IPython.display import display, clear_output


####################################
######## SPARK RUNNING ##########
####################################
os.environ["JAVA_HOME"] = "/usr"
os.environ["SPARK_HOME"] = "/opt/spark"
findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()
'''
spark = SparkSession.builder \
    .appName('SparkCassandraApp') \
    .config('spark.cassandra.connection.host', 'localhost') \
    .config('spark.cassandra.connection.port', '9042') \
    .config('spark.cassandra.output.consistency.level','ONE') \
    .master("local[*]") \
    .getOrCreate()
'''
ks.set_option('compute.ops_on_diff_frames', True)

####################################
######## PATH SETTINGS ##########
####################################
path_1 = "./"

# HELPER FUNCTIONS

def values_type(dataframe, column): 
    """
    This functions returns a set that contains the data types contained within a column of a pandas or koalas dataframe.

    Parameters:
    - dataframe: pandas or koalas dataframe
    - column: the name of the column to analize
    """
    types = set()
    for value in dataframe[column].to_numpy():
        types.add(type(value))
    return types

# DEALING WITH DUPLICATED REGISTERS

def drop_duplicates(Table):
    """
    Returns a Dataframe with no duplicates

    Parameters:
    - Table: Pandas or Koalas dataframe
    """
    return Table.drop_duplicates()


####################################
######## COMMON FUNCTIONS ##########
####################################
def import_json(file:str, path:Path = path_1, format:str = 'json'):
    '''
    This function imports files with spark and transforms them into DataFrame using the koala library

    Arguments:
    :: file: str of the file name
    :: path: 'path' path where the file is stored
    :: format: 'str' file format 

    Returns: 
    ---------
    Dataframe and print shape 
    '''
    path_final = path + file
    print('READING JSON')
    df = ks.read_json(path_final, lines=True)
    print(f"Shape of {file} is {df.shape}")
    return df
    

def upload_to_cassandra(df, table_name):
    df.write.format("org.apache.spark.sql.cassandra")\
    .options(table=table_name, keyspace="yelp")\
    .mode('append')\
    .save()


# ID VALIDATION

def check_id_chars(Table, id_column):
    """
    Checks if the strings in an ID column have the required characters (20).
    This function is meant to be called within the 'drop_bad_ids' function.
    Returns a list of indexes at which the column has an invalid ID.

    Parameters:
    - Table: Pandas or Koalas dataframe
    - id_column: column containing 22 character ID's
    """
    problems = []
    for index, value in Table[id_column].items() :
        if len(value) != 22:
            problems.append(index)
    return problems

def drop_bad_ids(Table, id_column):
    """
    This function removes the rows in a table where an ID is not valid.
    Returns a table with only valid ID's in the passed column.

    Parameters:
    - Table: Koalas dataframe
    - id_column: column containing 22 character ID's
    """
    id_list = check_id_chars(Table, id_column)
    return Table[ks.Series((~Table.index.isin(id_list)).to_list())].reset_index(drop=True)

# NUMERIC VALUES

def impute_num(Table, col_list, absolute=False):
    """
    This function replaces missing values in numeric columns with 0.
    If the 'absolute' parameter is passed, the function also converts 
    the numeric columns into their absolute value.

    Parameters:
    - Table: Pandas or Koalas dataframe
    - col_list: list of numeric columns with missing values to be imputed
    - absolute: boolean, decides if the column will contain absolute values. Default: False.
    """
    for col in col_list:
        Table[col].fillna(0)
        if absolute:
            Table[col] = Table[col].apply(lambda x: abs(x))

# STRING VALUES

def clean_string(string):
    """
    This function cleans strings by removing whitespaces at the beginning and 
    at the end of the string, replacing double spaces with single spaces and
    converting the string to lower case.
    It is meant to be used within the 'drop_bad_str' function.
    Returns a clean string.

    Parameters:
    - string: some string to be cleaned
    """
    new_str = string.strip().replace('  ',' ').lower()
    return new_str

def drop_bad_str(Table, col):
    """
    This function takes a Dataframe and the name of a column that contains string values, 
    imputes missing values in the column, cleans it's strings and removes registers where 
    the string in the column has 2 or less characters.
    The function returns the dataframe after performing the above mentioned transformations
    and dropping the unwanted registers.

    Parameters:
    - Table: Pandas or Koalas dataframe
    - col: string, the name of the column to transform
    """
    T_ok = Table.copy()
    T_ok[col] = T_ok[col].fillna('NO DATA')
    T_ok[col] = T_ok[col].apply(clean_string)
    bad_strs = []
    for index, tip in T_ok[col].items():
        if len(tip) <=2:
            bad_strs.append(index)
    return T_ok[ks.Series((~Table.index.isin(bad_strs)).to_list())].reset_index(drop=True)


# DATETIME VALUES

def transform_dates(dataframe,column,format):
    """
    This function recieves 1) a dataframe, 2) the name of a column containing timestamp values
    and 3) a date format. It returns the dataframe after transforming the column to the desired 
    format.
    
    Parameters:
    - dataframe: a Koalas dataframe
    - column: the name of the column containing timestamp values
    - format: the datetime format to which the column will be transformed
    """
    series = ks.to_datetime(dataframe[column], errors='coerce')
    mode = series.mode().iloc[0].strftime(format)
    series = series.apply(lambda x: mode if (x is pd.NaT) else x.strftime(format))
    return series

# LISTS OF STRINGS

def check_str_list(ls):
    """
    This function recieves a list and returns a second list containing only the strings from the
    original list. In case there were none, it returns an empty list. If a None value is passed, 
    the function returns an empty list.
    """
    try:
        ls_ok = []
        for x in ls:
            if type(x) == str:
                ls_ok.append(x)
        return ls_ok
    except:
        return []

# DICTIONARY

def row_hours_to_list(row):
    """
    Returns a list of lists, each sublist containing the day of the week, it's opening hour and it's closing hour. E.g.: [[1,8,18],[2,8,18]...]

    Parameters:
    - row: pyspark row object
    """
    dicc = row.asDict()
    day_dicc = {
        'Monday': 1,
        'Tuesday': 2,
        'Wednesday': 3,
        'Thursday': 4,
        'Friday': 5,
        'Saturday': 6,
        'Sunday': 7
    }

    check = zip(dicc.keys(),list(map(lambda x: x.split('-') if isinstance(x,str) else x,dicc.values())))
    
    return [[day_dicc[key],
            int(value[0].split(':')[0])+int(value[0].split(':')[1]),
            int(value[1].split(':')[0])+int(value[1].split(':')[1])
            ] if value is not None else [day_dicc[key],0,0] for key,value in check]

def row_hours_to_series(series):
    """
    This function takes a column from a koalas dataframe that contains a dictionary with each day of the week as a key and
    the opening and closing schedules for the day as the value.
    The function returns a koalas series whose elements are lists of lists in the same format as the outputed by the 
    'row_hours_to_list' function.

    Parameters:
    - series: koalas series
    """
    series_mode = row_hours_to_list(series.mode().iloc[0])
    series_output = []
    for index, value in series.items():
        if value is None:
            series_output.append(series_mode)
        else:
            series_output.append(row_hours_to_list(value))
    return ks.Series(series_output)


def get_date_as_list(value):
    ls = value.split(', ')
    return ls

def get_total_checkins(value):
    ls = value.split(', ')
    return len(ls)

def get_state_city(df):
    print('SETTING OPTION')
    ks.set_option('compute.ops_on_diff_frames', True)
    print('GeTTING CITY LIST')
    cities = list(df.city.to_numpy())
    print('GeTTING STATE LIST')
    states = list(df.state.to_numpy())
    print('OBTAINING SERIES')
    state_city = ks.Series([[states[i],cities[i]] for i in range(len(cities))])
    print('CREATING COLUMN')
    df['state_city'] = state_city

22/11/23 19:21:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
spark.sparkContext.setLogLevel("OFF")

In [None]:
def BusinessEDA():
    print('IMPORTING BUSINESS')
    df = import_json(file = 'business.json', path = './data/')
    print('DROPPING DUPLICATES')
    df = drop_duplicates(df)
    
    ######## OPEN HOURS ##########
    #df['hours'] = df['hours'].apply(row_hours_to_series)
    
    print('CHECKING STRINGS')
    ######## CATEGORIES ##########
    df['categories'] = df['categories'].apply(check_str_list)

    ######## CITY/STATE ##########
    print('GETTING STATE_CITY COL')
    get_state_city(df)
    print('DROPPING CITY & STATE COLS')
    df = df.drop(['city', 'state'], axis=1)

    print('TRYING TO UPLOAD')

    try:
        upload_to_cassandra(df, 'business')
        print('Business uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading BUSINESS to Cassandra')

BusinessEDA()

In [None]:
print('CHECKIN')

def CheckinEDA():
    print('IMPORTING')
    df = import_json(file = 'checkin.json', path = './data/')

    print('DROPPING DUPS')
    df = drop_duplicates(df)
    
    print('GETTING DATE LIST')
    df['date'] = df['date'].apply(get_date_as_list)

    #print('GETTING TOTAL')
    #df['total'] = df['date'].apply(get_total_checkins)

    try:
        upload_to_cassandra(df, 'checkin')
        print('Checkin uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading CHECKIN to Cassandra')

CheckinEDA()

In [None]:
def TipsEDA():

    df = import_json(file = 'tip.json', path = './data/')


    df = drop_duplicates(df)

    df = drop_bad_str(df, 'text')

    df['date'] = transform_dates(df, 'date', '%Y-%m-%d')

    try:
        upload_to_cassandra(df, 'tips')
        print('Tips uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading TIPS to Cassandra')

TipsEDA()

In [25]:
def UserEDA():
    df = import_json(file = 'user.json', path = './data/')
    df = drop_duplicates(df)

    df['friends'] = df['friends'].apply(check_str_list)

    df['elite'] = df['elite'].apply(check_str_list)

    df['yelping_since'] = transform_dates(df, 'yelping_since', '%Y-%m-%d')

    try:
        upload_to_cassandra(df, 'users')
        print('Users uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading USERS to Cassandra')

UserEDA()

READING JSON


                                                                                

Shape of user.json is (1987897, 22)




ERROR uploading USERS to Cassandra


In [4]:
def ReviewEDA():
    df = import_json(file = 'review.json', path = './data/')
    
    print('DELETING DUPLICATES')
    df = drop_duplicates(df)

    #print('DELETING BAD ID')
    #df['user_id'] = drop_bad_ids(df, 'user_id')
    #df['business_id'] = drop_bad_ids(df, 'business_id')
    #df['review_id'] = drop_bad_ids(df, 'review_id')

    #print('IMPUTING NEGATIVE VOTES')
    #impute_num(df, ['useful', 'funny', 'cool'], True) ##### REALLY SLOW

    print('TRANSFORMING DATES')
    df['date'] = transform_dates(df, 'date', '%Y-%m-%d')

    try:
        upload_to_cassandra(df, 'reviews')
        print('Reviews uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading REVIEWS to Cassandra')

ReviewEDA()

READING JSON


                                                                                

Shape of review.json is (6990280, 9)
DELETING DUPLICATES
IMPUTING NEGATIVE VOTES




TRANSFORMING DATES




ERROR uploading REVIEWS to Cassandra




In [None]:
####################################
######## LIBRARIES IMPORT ##########
####################################
import requests
from datetime import datetime
from pathlib import Path
import os
import json
import ast
import pandas as pd
import numpy as np
from sklearn.preprocessing import MultiLabelBinarizer
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
import pyspark.pandas as ps
import databricks.koalas as ks

from transform_funcs import *

####################################
######## SPARK RUNNING ##########
####################################
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop2.7"

spark = SparkSession.builder \
    .appName('SparkCassandraApp') \
    .config('spark.cassandra.connection.host', 'localhost') \
    .config('spark.cassandra.connection.port', '9042') \
    .config('spark.cassandra.output.consistency.level','ONE') \
    .master('local[2]') \
    .getOrCreate()


####################################
######## PATH SETTINGS ##########
####################################
path_1 = "./data/"


####################################
######## COMMON FUNCTIONS ##########
####################################
def ImporterJSON(file:str, path:Path = path_1, format:str = 'json'):
    '''
    This function imports files with spark and transforms them into DataFrame using the koala library

    Arguments:
    :: file: str of the file name
    :: path: 'path' path where the file is stored
    :: format: 'str' file format 

    Returns: 
    ---------
    Dataframe and print shape 
    '''
    path_final = path + file
    df = ps.read_json(path_final, lines=True)
    print(f"Shape of {file} is {df.shape}")
    return df

def UploadToCassandra(df, table_name):
    df.write.format("org.apache.spark.sql.cassandra")\
    .options(table=table_name, keyspace="yelp")\
    .mode('append')\
    .save()

def GetTime(datetime):
    return datetime.dt.hour()

def GetWordCount(str):
    ls = str.split()
    return len(ls)

def CountItemsFromList(value):
    ls = value.split(', ')
    return len(ls)

def Dicc(row):
    result = ast.literal_eval(row)
    return result

def GetAVG(dates_list:list):
    hours_sum = 0
    ls = dates_list.split(', ')
    list_len = len(ls)
    for date in ls:
        date_ok = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
        hours_sum += date_ok.hour
    avg_checkins = hours_sum/list_len
    return round(avg_checkins)

def GetEarliestYear(dates_list:list):
    ls = dates_list.split(', ')
    earliest_year = 0
    for date in ls:
        date_ok = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
        if earliest_year < date_ok.year:
            earliest_year = date_ok.year
    return earliest_year

def CountByYear(dates_list, year):
    ls = dates_list.split(', ')
    yearly_checkins = []
    for date in ls:
        date_ok = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
        if date_ok.year == year:
            yearly_checkins.append(date_ok)
    return len(yearly_checkins)


####################################
    ######## REVIEWS  ##########
####################################

def ReviewEDA():
    df = ImporterJSON(file = 'reviews.json')
    
    df = drop_duplicates(df)

    df['user_id'] = drop_bad_ids(df, 'user_id')
    df['business_id'] = drop_bad_ids(df, 'business_id')
    df['review_id'] = drop_bad_ids(df, 'review_id')

    impute_num(df, ['useful', 'funny', 'cool'], True)

    df['hour'] = transform_dates(df, 'date', '%H')
    df['year'] = transform_dates(df, 'date', '%Y')

    # df['datetime'] = df.date.astype(datetime)
    # df['date'] = df.datetime.apply(lambda x: x.date())
    # df['hour'] = df.datetime.dt.hour
    # df['year'] = df.datetime.dt.year

    df['word_count'] = df.text.apply(GetWordCount)

    try:
        UploadToCassandra(df, 'reviews')
        print('Reviews uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading REVIEWS to Cassandra')

    

####################################
    ######## USERS  ##########
####################################

def UserEDA():
    df = ImporterJSON(file = 'users.json')
    df = drop_duplicates(df)
    df['friends_number'] = df['friends'].apply(CountItemsFromList)

    df['n_interactions_send'] = df['useful'] + df['funny'] + df['cool']

    df['n_interactions_received'] = df[[ 'compliment_hot',
    'compliment_more', 'compliment_profile', 'compliment_cute',
    'compliment_list', 'compliment_note', 'compliment_plain',
    'compliment_cool', 'compliment_funny', 'compliment_writer',
    'compliment_photos']].sum(axis=1)

    df['n_years_elite'] = df['elite'].apply(CountItemsFromList)
    df['n_years_elite'] = df['n_years_elite'].fillna(0)

    try:
        UploadToCassandra(df, 'users')
        print('Users uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading USERS to Cassandra')



####################################
    ######## BUSINESS  ##########
####################################

def BusinessEDA():
    df = ImporterJSON(file = 'business.json')
    df = drop_duplicates(df)

    ######## ATRIBUTES ##########
    # attributes = pd.json_normalize(df['attributes'])
    # attributes['business_id'] = df.index
    # attributes.loc[attributes.BusinessParking == 'None']="{'garage': False, 'street': False, 'validated': False, 'lot': False, 'valet': False}"
    # attributes.BusinessParking = attributes.BusinessParking.fillna("{'garage': False, 'street': False, 'validated': False, 'lot': False, 'valet': False}") # a los valore nulos le pogo False
    # attributes.loc[attributes.Ambience == 'None']="{'romantic': False, 'intimate': False, 'classy': False, 'hipster': False, 'touristy': False, 'trendy': False, 'upscale': False, 'casual': False}"
    # attributes.Ambience = attributes.Ambience.fillna("{'romantic': False, 'intimate': False, 'classy': False, 'hipster': False, 'touristy': False, 'trendy': False, 'upscale': False, 'casual': False}")
    # parking = pd.json_normalize(attributes.BusinessParking.apply(Dicc))
    # ambience = pd.json_normalize(attributes.Ambience.apply(Dicc))
    # parking.set_index(df.index, inplace=True)
    # ambience.set_index(df.index, inplace=True)
    # attributes = attributes.drop(['BusinessParking', 'Ambience'], axis=1)
    # attributes = pd.concat([attributes, parking, ambience], axis=1)
    
    ######## OPEN HOURS ##########
    #openhours = pd.json_normalize(df['hours'])
    #openhours['business_id'] = df.index
    
    ######## CATEGORIES ##########
    df['categories'] = df['categories'].apply(check_str_list)
    #df['city_state'] = list(df['state'], df['city'])

    
    # categories = pd.json_normalize(df['categories'])
    # categories = df['categories'].str.split(', ', expand=True)
    # categories = categories.T.stack().groupby('business_id').apply(list).reset_index(name='categories')
    # mlb = MultiLabelBinarizer()
    # cat_full = categories.join(pd.DataFrame(mlb.fit_transform(categories.pop('categories')),
    #                         columns=mlb.classes_,
    #                         index=categories.index))

    ######## COPYING TO DATALAKE ##########
    attributes.to_csv('./data/attributes.csv', index=False)
    openhours.to_csv('./data/openhours.csv', index=False)
    cat_full.to_csv('./data/categories.csv', index=False)

    df.drop(['attributes', 'hours', 'categories'], axis=1, inplace=True)

    try:
        UploadToCassandra(df, 'business')
        print('Business uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading BUSINESS to Cassandra')




####################################
    ######## CHECKIN  ##########
####################################

def CheckinEDA():
    df = ImporterJSON(file = 'checkin.json')

    df = drop_duplicates(df)
    
    df['number_visits'] = df['date'].apply(CountItemsFromList)

    df['avg_hour'] = df['date'].apply(GetAVG)

    df['earliest_year'] = df['date'].apply(GetEarliestYear)

    for x in range(2010, 2022):
        df[str(x)] = df.date.apply(CountByYear, args=(x,))

    try:
        UploadToCassandra(df, 'checkin')
        print('Checkin uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading CHECKIN to Cassandra')


####################################
    ######## TIPS  ##########
####################################

def TipsEDA():
    df = ImporterJSON(file = 'tips.json')
    df = drop_duplicates(df)

    df = drop_bad_str(df, 'text')

    df['date'] = ps.to_datetime(df['date'])
    df['year'] = df.datetime.dt.year
    df['word_count'] = df.text.apply(GetWordCount)

    try:
        UploadToCassandra(df, 'tips')
        print('Tips uploaded to Cassandra')
        return "Done"
    except:
        print('ERROR uploading TIPS to Cassandra')


####################################
######## SENTIMENT UPLOAD  #######
####################################

def SentimentUpload():
    return ""

####################################
######### QUERY FUNCTIONS  ##########
####################################

def MakeQuery(query):
    sqlContext = SQLContext(spark)
    ds = sqlContext \
    .read \
    .format('org.apache.spark.sql.cassandra') \
    .options(table='business', keyspace='yelp') \
    .load()

    try:
        ds.show(10) 
        print('Query executed')
        return "OK"
    except:
        print('ERROR executing query')
        return "ERROR"


In [None]:
####################################
######## LIBRARIES IMPORT ##########
####################################

import requests
import pandas as pd
import numpy as np
from sklearn.preprocessing import MultiLabelBinarizer
from datetime import datetime
import databricks.koalas as ks
from pathlib import Path
import os
import json
import ast
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.pandas as ps


####################################
######## SPARK RUNNING ##########
####################################
spark = SparkSession.builder.master("local[*]").getOrCreate()

####################################
######## PATH SETTINGS ##########
####################################
path_1 = "./data/"


####################################
######## COMMON FUNCTIONS ##########
####################################
def ImporterJSON(file:str, path:Path = path_1, format:str = 'json'):
    '''
    This function imports files with spark and transforms them into DataFrame using the koala library

    Arguments:
    :: path: 'path' path where the file is stored
    :: format: 'str' file format 

    Returns: 
    ---------
    Dataframe and print shape 
    '''
    path_final = path + file
    df = spark.read.load(path, format=format)
    df = df.to_koalas()
    print(df.shape)

    return df

def GetTime(datetime):
    return datetime.dt.hour()

def GetWordCount(str):
    ls = str.split()
    return len(ls)

def CountItemsFromList(value):
    ls = value.split(', ')
    return len(ls)

def Dicc(row):
    result = ast.literal_eval(row)
    return result

def GetAVG(dates_list:list):
    hours_sum = 0
    ls = dates_list.split(', ')
    list_len = len(ls)
    for date in ls:
        date_ok = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
        hours_sum += date_ok.hour
    avg_checkins = hours_sum/list_len
    return round(avg_checkins)

def GetEarliestYear(dates_list:list):
    ls = dates_list.split(', ')
    earliest_year = 0
    for date in ls:
        date_ok = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
        if earliest_year < date_ok.year:
            earliest_year = date_ok.year
    return earliest_year

def CountByYear(dates_list, year):
    ls = dates_list.split(', ')
    yearly_checkins = []
    for date in ls:
        date_ok = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
        if date_ok.year == year:
            yearly_checkins.append(date_ok)
    return len(yearly_checkins)

####################################
    ######## REVIEWS  ##########
####################################

def ReviewEDA():
    df = ImporterJSON(file = 'reviews.json')

    df['datetime'] = df.date.astype(datetime)
    df['date'] = df.datetime.apply(lambda x: x.date())
    df['hour'] = df.datetime.dt.hour
    df['year'] = df.datetime.dt.year
    df['word_count'] = df.text.apply(GetWordCount)

    return df



####################################
    ######## USERS  ##########
####################################

def UserEDA():
    df = ImporterJSON(file = 'users.json')

    df['friends_number'] = df['friends'].apply(CountItemsFromList)

    df['n_interactions_send'] = df['useful'] + df['funny'] + df['cool']

    df['n_interactions_received'] = df[[ 'compliment_hot',
    'compliment_more', 'compliment_profile', 'compliment_cute',
    'compliment_list', 'compliment_note', 'compliment_plain',
    'compliment_cool', 'compliment_funny', 'compliment_writer',
    'compliment_photos']].sum(axis=1)

    df['n_years_elite'] = df['elite'].apply(CountItemsFromList)
    df['n_years_elite'] = df['n_years_elite'].fillna(0)

    return df



####################################
    ######## BUSINESS  ##########
####################################

def BusinessEDA():
    df = ImporterJSON(file = 'business.json')


    ######## ATRIBUTES ##########
    attributes = pd.json_normalize(df['attributes'])
    attributes['business_id'] = df.index
    attributes.loc[attributes.BusinessParking == 'None']="{'garage': False, 'street': False, 'validated': False, 'lot': False, 'valet': False}"
    attributes.BusinessParking = attributes.BusinessParking.fillna("{'garage': False, 'street': False, 'validated': False, 'lot': False, 'valet': False}") # a los valore nulos le pogo False
    attributes.loc[attributes.Ambience == 'None']="{'romantic': False, 'intimate': False, 'classy': False, 'hipster': False, 'touristy': False, 'trendy': False, 'upscale': False, 'casual': False}"
    attributes.Ambience = attributes.Ambience.fillna("{'romantic': False, 'intimate': False, 'classy': False, 'hipster': False, 'touristy': False, 'trendy': False, 'upscale': False, 'casual': False}")
    parking = pd.json_normalize(attributes.BusinessParking.apply(Dicc))
    ambience = pd.json_normalize(attributes.Ambience.apply(Dicc))
    parking.set_index(df.index, inplace=True)
    ambience.set_index(df.index, inplace=True)
    attributes = attributes.drop(['BusinessParking', 'Ambience'], axis=1)
    attributes = pd.concat([attributes, parking, ambience], axis=1)
    
    ######## OPEN HOURS ##########
    openhours = pd.json_normalize(df['hours'])
    openhours['business_id'] = df.index
    
    ######## CATEGORIES ##########
    categories = pd.json_normalize(df['categories'])
    categories = df['categories'].str.split(', ', expand=True)
    categories = categories.T.stack().groupby('business_id').apply(list).reset_index(name='categories')
    mlb = MultiLabelBinarizer()
    cat_full = categories.join(pd.DataFrame(mlb.fit_transform(categories.pop('categories')),
                            columns=mlb.classes_,
                            index=categories.index))

    ######## COPYING TO DATALAKE ##########
    attributes.to_csv('./data/attributes.csv', index=False)
    openhours.to_csv('./data/openhours.csv', index=False)
    cat_full.to_csv('./data/categories.csv', index=False)

    df.drop(['attributes', 'hours', 'categories'], axis=1, inplace=True)

    return df



####################################
    ######## CHECKIN  ##########
####################################

def CheckinEDA():
    df = ImporterJSON(file = 'checkin.json')
    
    df['number_visits'] = df['date'].apply(CountItemsFromList)

    df['avg_hour'] = df['date'].apply(GetAVG)

    df['earliest_year'] = df['date'].apply(GetEarliestYear)

    for x in range(2010, 2022):
        df[str(x)] = df.date.apply(CountByYear, args=(x,))

    return df



####################################
    ######## TIPS  ##########
####################################

def TipsEDA():
    df = ImporterJSON(file = 'tips.json')

    df['date'] = ps.to_datetime(df['date'])
    df['year'] = df.datetime.dt.year
    df['word_count'] = df.text.apply(GetWordCount)

    return df





























# def trim_all_columns(df):
#     """
#     > If the value is a string, strip whitespace from the ends of the string. Otherwise, return the value
    
#     Arguments: 
#     --------------------------------   
#     :param df: The dataframe you want to trim

#     Return
#     --------------------------------
#     A dataframe with all the values trimmed.
#     """
#     trim_strings = lambda x: x.strip() if isinstance(x, str) else x
#     return df.applymap(trim_strings)

# def normalize_column(df, column_name):
#     """
#     > This function takes a dataframe and a column name as input, and returns a new dataframe with the
#     column normalized

#     Arguments: 
#     --------------------------------
#     :param df: the dataframe
#     :param column_name: The name of the column you want to normalize
#     """
#     df[column_name] = df[column_name].str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8')
#     return df[column_name]

# # Source: https://stackoverflow.com/questions/9662346/python-code-to-remove-html-tags-from-a-string
# def clean_values(series, to_replace, value = '', regex = True):
#     """
#     > This function takes a pandas series, a list of values to replace, and a value to replace them with,
#     and returns a series with the values replaced.

#         Arguments: 
#     --------------------------------
#     :param series: the series you want to clean
#     :param to_replace: The value or list of values to be replaced
#     :param value: the value to replace the to_replace values with
#     :param regex: If True, assumes the to_replace pattern is a regular expression, defaults to True
#     (optional)
#     """
#     for i in to_replace:
#         series = series.str.replace(i, value, regex=regex)
#     return series

# def get_lat_lon(address, access_key = '2e843c7ee44a8f52742a8168d0121a0a', URL = "http://api.positionstack.com/v1/forward"):
#     """
#     > It takes an address and returns the latitude and longitude of that address

#      Arguments: 
#     --------------------------------   
#     :param address: The address you want to get the latitude and longitude for
#     :param access_key: This is the access key that you get from the website, defaults to
#     2e843c7ee44a8f52742a8168d0121a0a (optional)
#     :param URL: The URL of the API endpoint, defaults to http://api.positionstack.com/v1/forward
#     (optional)

#     Return
#     --------------------------------
#     A tuple of latitude and longitude
#     """
#     PARAMS = {'access_key': access_key, 'query': address}
#     r = requests.get(url = URL, params = PARAMS)
#     data = r.json()
#     return data['data'][0]['latitude'], data['data'][0]['longitude']

# def run_exps(X_train: pd.DataFrame , y_train: pd.DataFrame, X_test: pd.DataFrame, y_test: pd.DataFrame) -> pd.DataFrame:
#     """
#     > This function takes in training and test data, and then runs a bunch of models on it, returning a
#     dataframe of the results
    
#     Arguments: 
#     --------------------------------
#     :param X_train: training split 
#     :param y_train: training target vector
#     :param X_test: test split
#     :param y_test: test target vector

#     Types: 
#     --------------------------------
#     :type X_train: pd.DataFrame
#     :type y_train: pd.DataFrame
#     :type X_test: pd.DataFrame
#     :type y_test: pd.DataFrame

#     Return
#     --------------------------------
#     A dataframe of predictions
#     """
    
#     dfs = []

#     dt = DecisionTreeClassifier(max_depth=1)

#     models = [
#         ('LogReg', LogisticRegression()), 
#         ('RF', RandomForestClassifier()),
#         ('KNN', KNeighborsClassifier()),
#         ('GNB', GaussianNB()),
#         ('XGB', XGBClassifier()),
#         ('ADA', AdaBoostClassifier(base_estimator=dt))
#         ]
#     results = []
#     names = []

#     scoring = ['accuracy', 'precision_weighted', 'recall_weighted', 'f1_weighted', 'roc_auc']

#     target_names = ['malignant', 'benign']

#     for name, model in models:
#             kfold = model_selection.KFold(n_splits=5, shuffle=True, random_state=90210)
#             cv_results = model_selection.cross_validate(model, X_train, y_train, cv=kfold, scoring=scoring)
#             clf = model.fit(X_train, y_train)
#             y_pred = clf.predict(X_test)
#             print(name)
#             print(classification_report(y_test, y_pred, target_names=target_names))
            
#     results.append(cv_results)
#     names.append(name)
#     this_df = pd.DataFrame(cv_results)
#     this_df['model'] = name
#     dfs.append(this_df)
#     final = pd.concat(dfs, ignore_index=True)
#     return final
