# PySpark

In [None]:
import findspark
findspark.init() 

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('CausalNex').getOrCreate()

path='../datasets/student-por.csv'

data=spark.read.csv(path, header=True, inferSchema=True, sep=';')

data.show(5)

In [None]:
drop_cols=['school', 'sex', 'age', 'Mjob', 'Fjob', 'reason', 'guardian']

data=data.select([c for c in data.columns if c not in drop_cols])

data

In [None]:
non_numeric_columns=[item[0]for item in data.dtypes if item[1].startswith('string')]
non_numeric_columns

In [None]:
struct_data=data.select('*')
struct_data

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType


indexers=StringIndexer(inputCols=non_numeric_columns, 
                       outputCols=[c+'_' for c in non_numeric_columns],
                       stringOrderType='alphabetAsc')

struct_data=indexers.fit(struct_data).transform(struct_data)

struct_data=struct_data.select([c for c in struct_data.columns if c not in non_numeric_columns])

for c in struct_data.columns:
    struct_data=struct_data.withColumn(c, struct_data[c].cast(IntegerType()))

    
struct_data.toPandas()

# Multiprocessing

In [None]:
def square(x):
    return x**2

In [None]:
data=[i for i in range(1000000)]

In [None]:
%%time
seq=[square(x) for x in data]
seq[:10]

In [None]:
import multiprocessing as mp
import numpy as np

In [None]:
mp.cpu_count()

In [None]:
%%time
pool=mp.Pool(mp.cpu_count())   # grupo con todos los nucleos

res=pool.map(square, data)
pool.close()
res[:10]

In [None]:
res=[]
for e in data:
    res.append(e**2)
res[:10]

In [None]:
list(map(square, data))[:10]

In [None]:
array=np.random.randint(0, 10, size=[200, 5000])
data=array.tolist()

In [None]:
def rango(r, mini, maxi):
    c=0
    for e in r:
        if mini<=e<=maxi:
            c=+1
    return c

In [None]:
%time res=[rango(e, 4, 8) for e in data]
res[:10]

In [None]:
pool=mp.Pool()

%time res=pool.starmap(rango, [rango(e, 4, 8) for e in data])
pool.close()

In [None]:
def rango2(i, r, mini, maxi):
    c=0
    
    for e in r:
        if mini<=e<=maxi:
            c=+1
            
    return i, c

In [None]:
res=[]

def colecta(resul):
    global res
    res.append(resul)

In [None]:
pool=mp.Pool(3)

for i,e in enumerate(data):
    pool.apply_async(rango2, args=(i,e,4,8), callback=colecta)
    
pool.close()
pool.join()

res.sort(key=lambda x: x[0])

res[:10]

In [None]:
import pandas as pd

In [None]:
df=pd.DataFrame(np.random.randint(3, 10, size=[5, 2]))
df

In [None]:
def hipo(c):
    return (c[1]**2+c[2]**2)**0.5

In [None]:
with mp.Pool(3) as pool:
    res=pool.imap(hipo, df.itertuples(name=None), chunksize=10)
    out=[round(e, 2) for e in res]
out

# Pathos

In [None]:
from pathos.multiprocessing import ProcessingPool as Pool

In [None]:
df

In [None]:
def fun(df):
    return df.shape

In [None]:
cores=mp.cpu_count()

df_split=np.array_split(df, cores, axis=0)

pool=Pool(cores)

df_out=pd.DataFrame(np.vstack(pool.map(fun, df_split)))

pool.close()
pool.join()
pool.clear()

In [None]:
df_out

# Dask

In [None]:
# !pip3 install "dask[complete]"
# !pip3 install dask_ml

In [None]:
import os
import dask
import dask.dataframe as dd

In [None]:
archivos=os.path.join('../datasets', 'accounts.*.csv')
print(archivos)

In [None]:
df=dd.read_csv(archivos)
df.head()

In [None]:
df=dd.read_csv(os.path.join('../datasets', 'nycflights', '*.csv'),
               parse_dates={'Date':[0,1,2]})
df.head()

In [None]:
df.tail()

In [None]:
df=dd.read_csv(os.path.join('../datasets', 'nycflights', '*.csv'),
               parse_dates={'Date':[0,1,2]},
               dtype={'TailNum': str, 'CRSElapsedTime':float, 'Cancelled':bool})


df.tail()

In [None]:
df.info(memory_usage='deep')

In [None]:
df.shape

In [None]:
df.DepDelay.max().compute()

In [None]:
df.DepDelay.max().visualize(filename='../images/max_dask.png')

In [None]:
len(df)

In [None]:
len(df[~df.Cancelled])    # virgulilla ==> lo contrario (como el no logico)

In [None]:
df.groupby('Origin').DepDelay.mean().compute()

In [None]:
no_cancelado=df[~df.Cancelled]

media_delay=no_cancelado.DepDelay.mean()
std_delay=no_cancelado.DepDelay.std()

In [None]:
std_delay

In [None]:
%time media_delay.compute()

In [None]:
%time std_delay.compute()

In [None]:
%time media_delay_res, std_delay_res = dask.compute(media_delay, std_delay)

In [None]:
media_delay_res

In [None]:
std_delay_res

In [None]:
type(std_delay_res)

In [None]:
dask.visualize(media_delay, std_delay, filename='../images/std_dask.png')

# HDFS y Parquet  (sistemas de archivos)

In [None]:
%time df_csv=dd.read_csv(archivos)
df_csv.head()

In [None]:
target=os.path.join('../datasets', 'accounts.h5')

In [None]:
%time df_csv.to_hdf(target, key='../datasets')

In [None]:
%time df_h=dd.read_hdf(target, key='../datasets')
df_h.head()

In [None]:
df_h.info(memory_usage='deep')

In [None]:
%time df_h.amount.sum().compute()

In [None]:
# parquet
# !pip3 install fastparquet

In [None]:
target=os.path.join('../datasets', 'accounts.parquet')

df_csv.categorize(columns=['names']).to_parquet(target,
                                                storage_options={'has_nulls':True},
                                                engine='fastparquet')

In [None]:
%time df_p=dd.read_parquet(target)
df_p.head()

In [None]:
df_csv.info(memory_usage='deep')

In [None]:
df_p.info(memory_usage='deep')

In [None]:
%time df_csv.amount.sum().compute()

In [None]:
%time df_p.amount.sum().compute()

# tqdm

In [None]:
# !pip3 install tqdm

In [None]:
from tqdm import trange

In [None]:
import time
from tqdm.notebook import tqdm

In [None]:
for i in tqdm(range(2), desc='Primer loop'):
    tqdm._instances.clear()
    for j in tqdm(range(5), desc='Segundo loop'):
        time.sleep(.5)

In [None]:
for i in trange(2):
    tqdm._instances.clear()
    for j in trange(5):
        time.sleep(.5)

In [None]:
df=pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.head()

In [None]:
tqdm.pandas()

df.groupby(0).progress_apply(lambda x: x**2)

In [None]:
from dask import delayed, compute
from dask.diagnostics import ProgressBar

In [None]:
res=[]

def fun(x):
    time.sleep(x)
    return x

for i in trange(10):
    res.append(delayed(fun)(i))
    
with ProgressBar():
    compute(res)

In [None]:
res[5].compute()

# Vaex

https://towardsdatascience.com/how-to-analyse-100s-of-gbs-of-data-on-your-laptop-with-python-f83363dda94

**Data:** https://vaex.s3.us-east-2.amazonaws.com/taxi/yellow_taxi_2009_2015_f32.hdf5

In [None]:
import vaex

import numpy as np
import pylab as plt
import seaborn as sns

import warnings
warnings.filterwarnings('ignore')

In [None]:
%%time
# tamaño del archivo
!ls -l -h ../../data/yellow_taxi_2009_2015_f32.hdf5

In [None]:
%%time
# lectura archivo
df = vaex.open('../../data/yellow_taxi_2009_2015_f32.hdf5')

In [None]:
%%time
df.describe()

In [None]:
%%time
df.plot_widget(df.pickup_longitude, 
               df.pickup_latitude, 
               shape=512, 
               limits='minmax',
               f='log1p', 
               colormap='plasma')