<a href="https://colab.research.google.com/github/ECV21/PySpark/blob/main/Quickstart_DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=bcece967697ddb3a3f13f0f1ddd094f6529ce0c221110464d4bb5fcf664da383
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() #iniciar sesión de Spark usando PySpark

# DataFrame Creation


In [7]:
#Crear dataframe con función createDataFrame()

from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)), #cada objeto ROW es una fila en DataFrame
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

# Create a PySpark DataFrame with an explicit schema

In [6]:
#crear un dataframe utilizando una lista de tuplas, en vez de utlizar la clase ROW

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

# Create a PySpark DataFrame from pandas DataFrame

In [7]:
#crear un dataframe con Pandas y convertirlo en un DataFrame de PySpark

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [8]:
#ver dataframe
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [9]:
#ver schema

df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



# View data

In [10]:
#ver solo 2 filas
df.show(2)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 2 rows



In [11]:
#Habilitar "eargerEval" en Spark para hacer que las operaciones sobre del dataFrame se evaluen automaticamente

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [12]:
#Ver filas en vertical

df.show(2, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
only showing top 2 rows



In [13]:
# You can see the DataFrame's schema and columns names

df.columns

['a', 'b', 'c', 'd', 'e']

In [14]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



# Show the summary of the DataFrame

In [15]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   NULL|
| stddev|1.0|1.0|   NULL|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



# Recuperar todas las filas de un df y verlo como lista de row

In [16]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [17]:
df.take(1) #para evitar error cuando datos son enormes

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

# Back to a pandas DataFrame

In [18]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


# Selecting and accesing Data

In [19]:
df.a

Column<'a'>

In [21]:
#demostrar que la comparación son de tipo "pyspark.sql.column", por eso será TRUE

from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [22]:
#Seleccionar columna c del dataframe df y mostrarlo

df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [24]:
# Ver más de dos columnas

df.select(df.a, df.c).show()

+---+-------+
|  a|      c|
+---+-------+
|  1|string1|
|  2|string2|
|  3|string3|
+---+-------+



In [25]:
#añadir nueva columnas, la nueva columna"upper_c" serpa las mayúsculas de la columna c

#el método WITHCOLUMN añade nueva columna o modifica una columna existente

df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [26]:
#Filtrar un DataFrame y mostrar solo las ROWs con valor igual a1

df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



In [30]:
#filtra y mostrar columnas donde la columna a tenga filas = 1 o =3

df.filter((df.a == 1) | (df.a ==3)).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



# Applying a Function

Una UDF (User-Defined Function) es una función definida por el usuario que extiende las capacidades nativas de un sistema como Apache Spark o bases de datos como SQL. En el contexto de PySpark, una UDF permite que los usuarios definan sus propias funciones personalizadas para aplicar transformaciones complejas a los datos que no están cubiertas por las funciones integradas de Spark.

Las UDFs son útiles cuando las funciones nativas de Spark no cubren todas las necesidades. Puedes crear una UDF para:

Realizar transformaciones complejas sobre una columna o varias columnas de un DataFrame.
Usar lógica de Python personalizada en las columnas de un DataFrame que no esté disponible en las funciones predefinidas de PySpark.
Manipular datos de manera más flexible, como aplicar una lógica condicional, transformar cadenas, números o realizar cálculos personalizados.

In [32]:
#Usar un Pandas UDFs en Pyspark para agregar 1 a los valores de una columna

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long') #definir una función Pandas UDF, la función permite usar operaciona basadas en pandas dentro de pyspark
def pandas_plus_one(series: pd.Series) -> pd.Series: #definir una función que toma como argumento una Serie de Pandas
    return series + 1 #retorna una serie de Pandas donde aplica operación de +1

df.select(pandas_plus_one(df.a)).show() #aplicar la función creada a la columna A de nuestro datafram

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



In [22]:
# usar el método "mapInPandas" de PySpark para aplicar funciones de Pandas a un df de spark y devolver df procesado

def pandas_filter_func(iterator): #la función toma un "iterador" que procesa particiones del df de PySpark
    for pandas_df in iterator: #para cada partición converitirlo en un datafram de pandas
        yield pandas_df[pandas_df.a == 1] #filtrar registros donde columna a tiene valor 1

df.mapInPandas(pandas_filter_func, schema=df.schema).show() #método mapInPandas opera sobre particiones de dataframe, cada
  #partición se convierte en un df de pandasm luego devuelve un df de spark

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



# Grouping Data

In [23]:
df = spark.createDataFrame([
    ['red', 'banana', 1,10], ['blue', 'banana', 2,20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4,40], ['red', 'carrot', 5,50], ['black', 'carrot', 6,60],
    ['red', 'banana', 7,70], ['red', 'grape', 8,80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()


+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [24]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



Aplicar la funcion nativa de python usando Pandas API

In [25]:
def plus_mean(pandas_df):
  return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



Co-grouping and applying a function

In [26]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def merge_ordered(l, r):
    return pd.merge_ordered(l, r)

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    merge_ordered, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000101|  1|1.0|   x|
|20000102|  1|3.0|NULL|
|20000101|  2|2.0|   y|
|20000102|  2|4.0|NULL|
+--------+---+---+----+



# Getting Data in/out

In [27]:
#Escribir y leer archivo csv

df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/content/foo.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [10]:
# Escribir y leer archivo Parquet

df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [21]:
# leer y escribir archivo ORC

df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/content/zoo.orc already exists. Set mode as "overwrite" to overwrite the existing path.

# Working with SQL

In [28]:
df.createOrReplaceTempView("tableA") #crear vista temporal de DataFrame
spark.sql("SELECT count(*) from tableA").show() #ejecutar consulta SQL sobre vista temporal de tablaA

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [29]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

# Podemos invocar UDFs para realizar SQL


@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
# Change 'v1' to an actual column name from your DataFrame (e.g., 'a')
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



In [31]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+

