# Ejercicio Práctico Solución de Koalas

* pip install findspark
* pip install pyspark==2.4.4
* pip install koalas
* pip install plotly
* pip install nbformat>=4.2.0
* conda install -n bigdata ipykernel --update-deps --force-reinstall
* pip install matplotlib

In [1]:
import os, sys

# 1) Mismo intérprete Python para driver y ejecutores
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# 2) Arrow y zona horaria (evita el warning)
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

# 3) Fuerza loopback local (mitiga VPN/firewall/hostnames raros)
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PoS-stable")
    # Propaga el mismo Python al ejecutor
    .config("spark.executorEnv.PYSPARK_PYTHON", sys.executable)
    # Propaga la var de Arrow al ejecutor
    .config("spark.executorEnv.PYARROW_IGNORE_TIMEZONE", "1")
    # Enlaza y anuncia el driver en localhost (evita timeouts con VPN/Firewall)
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    # (Opcional pero útil) fija timezone de Spark
    .config("spark.sql.session.timeZone", "UTC")
    .getOrCreate()
)

# Test mínimo: fuerza una operación Python worker
import pyspark.pandas as ps
ps.range(10).head(3)  # esto hace collect y valida el canal Py4J/Python worker



Unnamed: 0,id
0,0
1,1
2,2


In [2]:
import findspark
findspark.init()

import pandas as pd
import pyspark

In [3]:
import numpy as np
import pandas as pd
import pyspark.pandas as ks

### Funciones básicas

In [4]:
pser = pd.Series([1, 3, 5, np.nan, 6, 8], name="s")
kser = ks.from_pandas(pser)



In [5]:
kser.sort_index()

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
Name: s, dtype: float64

In [6]:
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})


kdf = ks.from_pandas(pdf)

### Visualización de datos

In [7]:
kdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.422161,0.624287
std,0.205652,0.360052
min,0.158333,0.090523
25%,0.257626,0.463698
50%,0.502532,0.682976
75%,0.543788,0.907633
max,0.648526,0.976607


In [8]:
kdf.sort_values(by='B')

Unnamed: 0,A,B
4,0.648526,0.090523
0,0.502532,0.463698
3,0.257626,0.682976
2,0.158333,0.907633
1,0.543788,0.976607


In [None]:
kdf.transpose()

### Selección

In [None]:
kdf[['A', 'B']]

Unnamed: 0,A,B
0,0.450976,0.781794
1,0.062639,0.511312
2,0.546861,0.458413
3,0.099604,0.78443
4,0.097036,0.077913


In [None]:
kdf.loc[1:2]

Unnamed: 0,A,B
1,0.062639,0.511312
2,0.546861,0.458413


In [None]:
kdf.iloc[:3, 1:2]

Unnamed: 0,B
0,0.781794
1,0.511312
2,0.458413


### Aplicando funciones de Python a Koalas

In [None]:
kdf.apply(np.cumsum)



Unnamed: 0,A,B
0,0.450976,0.781794
1,0.513615,1.293105
2,1.060477,1.751518
3,1.16008,2.535948
4,1.257117,2.613861


In [None]:
kdf.apply(lambda x: x ** 2)



Unnamed: 0,A,B
0,0.20338,0.611201
1,0.003924,0.26144
2,0.299057,0.210143
3,0.009921,0.61533
4,0.009416,0.00607


### Agrupando datos

In [None]:
kdf.groupby(['A', 'B']).sum()



A,B
0.450976,0.781794
0.062639,0.511312
0.546861,0.458413
0.099604,0.78443
0.097036,0.077913


### Generando gráficos

In [None]:
# This is needed for visualizing plot on notebook
%matplotlib inline

In [None]:
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']
kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.barh()

ImportError: plotly is required for plotting when the default backend 'plotly' is selected.

In [None]:
kdf = ks.DataFrame({
    'sales': [3, 2, 3, 9, 10, 6, 3],
    'signups': [5, 5, 6, 12, 14, 13, 9],
    'visits': [20, 42, 28, 62, 81, 50, 90],
}, index=pd.date_range(start='2019/08/15', end='2020/03/09',
                       freq='M'))
kdf.plot.area()

ImportError: plotly is required for plotting when the default backend 'plotly' is selected.

### Utilizando SQL en Koalas

In [None]:
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

In [None]:
ks.sql("SELECT * FROM {kdf} WHERE pig > 100", kdf=kdf)


Unnamed: 0,year,pig,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [None]:
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [None]:
result = ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks
    INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken
''', kdf=kdf, pdf=pdf)   

print(result)


    pig  chicken
0    18      326
1    20      250
2   489      589
3   675     1241
4  1776     2118


### Trabajando con PySpark

In [None]:
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
sdf = kdf.to_spark()
type(sdf)



pyspark.sql.dataframe.DataFrame

In [None]:
sdf.show()

+---+---+
|  A|  B|
+---+---+
|  1| 10|
|  2| 20|
|  3| 30|
|  4| 40|
|  5| 50|
+---+---+

