In [None]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=54f73cd0c81028ec42706640fadcb91005db32090d75b65d443ec16c98452f3e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark = SparkSession.builder.appName("SparkSQLSesion02").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DateType
from pyspark.sql.functions import to_date


# Definir un esquema para un DataFrame

custom_schema = StructType([
    StructField("Nombre", StringType(), True),  # Columna de tipo String
    StructField("Edad", IntegerType(), True),   # Columna de tipo Integer
    StructField("Sexo", StringType(), True),  # Columna de tipo String
    StructField("Hobbies", ArrayType(StringType()), True),  # Columna de tipo Array de Strings
    StructField("Fecha_Nacimiento", StringType(), True),  # Columna de tipo String
    StructField("Estado", StringType(), True)  # Columna de tipo String
])

In [None]:
import requests
url = "http://arcelia.net/datos_abiertos/datos_fake_personas/data_spark_100.json"
response = requests.get(url)
data = response.json()

df = spark.createDataFrame(data, schema=custom_schema)
df.show()

+----------+----+---------+--------------------+----------------+--------------------+
|    Nombre|Edad|     Sexo|             Hobbies|Fecha_Nacimiento|              Estado|
+----------+----+---------+--------------------+----------------+--------------------+
|   Cynthia|  32| Femenino|        [Television]|      1990-09-24|             Sinaloa|
|      Iván|  50| Femenino|[Fotografía, Cine...|      1972-11-08|          Nuevo León|
|    Homero|  45|Masculino|[Fotografía, Pint...|      1978-02-12|           Querétaro|
|   Gustavo|  40|Masculino|     [Leer, Pintura]|      1983-02-24|             Morelos|
|    Amalia|  40| Femenino|[Música, Pintura,...|      1983-06-02|    Distrito Federal|
|    Nayeli|  23| Femenino|[Viajar, Videojue...|      2000-03-15|    Distrito Federal|
|     Jorge|  21| Femenino|[Videojuegos, Dep...|      2001-10-19|             Nayarit|
|   Daniela|  36| Femenino|[Cine, Television...|      1987-01-01|           Chihuahua|
|    Genaro|  50|Masculino|  [Pintura, Depo

In [None]:
df = df.withColumn("Fecha_Nacimiento", to_date(df["Fecha_Nacimiento"], "yyyy-MM-dd"))
print(df.dtypes)
print(df.count())

[('Nombre', 'string'), ('Edad', 'int'), ('Sexo', 'string'), ('Hobbies', 'array<string>'), ('Fecha_Nacimiento', 'date'), ('Estado', 'string')]
100


In [None]:
df.rdd.getNumPartitions()

17

In [None]:
partition_size = df.rdd.glom().map(len).collect()
row_number = df.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()

for i, (size, count) in enumerate(zip(partition_size, row_number)):
  print(f"Partición {i+1}: Tamaño: {size} bytes, No. Renglones = {count}")

Partición 1: Tamaño: 33 bytes, No. Renglones = 33
Partición 2: Tamaño: 34 bytes, No. Renglones = 34
Partición 3: Tamaño: 33 bytes, No. Renglones = 33


In [None]:
df = df.repartition(3)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
! ls -la /content/drive/MyDrive/datos/spark/data

total 12
drwx------ 2 root root 4096 Jan 16 14:42 bike-data
drwx------ 4 root root 4096 Jan 16 14:43 deep-learning-images
drwx------ 7 root root 4096 Jan 16 14:43 flight-data


In [None]:
ruta = "/content/drive/MyDrive/datos/spark/"

In [None]:
#permissive, dropMalformed
df_csv = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferScheme", "true")\
.load(ruta+"/data/flight-data/csv/")
print(df_csv.count())
print(df_csv.show(5))

1757
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

None


In [None]:
df_json = spark.read.format("json")\
.option("mode", "FAILFAST")\
.option("inferScheme", "true")\
.load(ruta+"/data/flight-data/json/")
print(df_json.count())
print(df_json.show(5))

1502
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows

None


In [None]:
! cat /content/drive/MyDrive/datos/spark/data/flight-data/parquet/2010-summary.parquet/*

In [None]:
df_parquet = spark.read.format("parquet")\
.load(ruta+"/data/flight-data/parquet/2010-summary.parquet")
print(df_parquet.count())
print(df_parquet.show(5))

255
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

None


In [None]:
df_parquet = spark.read.parquet(ruta+"/data/flight-data/parquet/2010-summary.parquet")
print(df_parquet.count())
print(df_parquet.show(5))

255
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

None


In [None]:
df_orc = spark.read.orc(ruta+"/data/flight-data/orc/2010-summary.orc")
print(df_orc.count())
print(df_orc.show(5))

255
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

None


In [None]:
df.show()

+---------+----+---------+--------------------+----------------+--------------------+
|   Nombre|Edad|     Sexo|             Hobbies|Fecha_Nacimiento|              Estado|
+---------+----+---------+--------------------+----------------+--------------------+
|    Jorge|  21| Femenino|[Videojuegos, Dep...|      2001-10-19|             Nayarit|
|   Nayeli|  23| Femenino|[Viajar, Videojue...|      2000-03-15|    Distrito Federal|
| Cristian|  32| Femenino|            [Bailar]|      1990-12-11|              Oaxaca|
|    Julio|  39|Masculino|[Leer, Deporte, B...|      1984-02-06|            Guerrero|
|Estefanía|  51| Femenino|[Cocinar, Bailar,...|      1971-12-23|           Querétaro|
|   Ángela|  40| Femenino|              [Cine]|      1983-03-30|             Yucatán|
|     Anel|  20| Femenino|[Cocinar, Leer, P...|      2003-09-04|          Guanajuato|
|    Lilia|  37| Femenino|           [Deporte]|      1986-03-15|Coahuila de Zaragoza|
|     Iván|  50| Femenino|[Fotografía, Cine...|      1

In [None]:
df_orc.write.format("csv")\
.mode("overwrite")\
.option("sep",";")\
.save(ruta+"write/csv")

In [None]:
df_orc.write.format("json")\
.mode("overwrite")\
.save(ruta+"write/json")

In [None]:
df_orc.write.format("parquet")\
.mode("overwrite")\
.save(ruta+"write/parquet")

In [None]:
df_orc.write.format("orc")\
.mode("overwrite")\
.save(ruta+"write/orc")

In [None]:
df_orc.write.format("csv")\
.mode("append")\
.option("sep",";")\
.save(ruta+"write/csv")

In [None]:
df_orc.write.format("csv")\
.mode("append")\
.option("sep",";")\
.partitionBy("count")\
.save(ruta+"write/csvpart")

In [None]:
df.select("Nombre","Edad","Fecha_Nacimiento","Estado","Sexo")\
.write.format("csv")\
.mode("append")\
.option("sep",";")\
.partitionBy("Sexo")\
.save(ruta+"write/personas/sexo")

In [None]:
df.select("Nombre","Edad","Fecha_Nacimiento","Sexo","Estado")\
.write.format("csv")\
.mode("append")\
.option("sep",";")\
.partitionBy("Estado")\
.save(ruta+"write/personas/estado")

In [None]:
df.select("Nombre","Edad","Fecha_Nacimiento","Estado","Sexo")\
.write.format("csv")\
.mode("append")\
.option("sep",";")\
.partitionBy("Estado","Sexo")\
.save(ruta+"write/personas/estado_sexo")

In [None]:
df_es = spark.read.format("csv").option("sep",";").load(ruta+"write/personas/estado_sexo")
df_es = df_es.withColumnRenamed("_c0", "Nombre")\
       .withColumnRenamed("_c1", "Edad")\
       .withColumnRenamed("_c2", "Fecha_Nacimiento")
df_es

Nombre,Edad,Fecha_Nacimiento,Estado,Sexo
Isabel,40,1983-03-09,Zacatecas,Femenino
Guadalupe,43,1980-06-03,Zacatecas,Femenino
Angélica,29,1993-10-08,Zacatecas,Femenino
Pamela,41,1982-04-23,Tabasco,Femenino
Josefina,56,1967-05-15,Tabasco,Femenino
Adela,50,1972-11-23,Tabasco,Femenino
Lilia,37,1986-03-15,Coahuila de Zaragoza,Femenino
Linda,52,1971-03-18,Coahuila de Zaragoza,Femenino
Arcelia,34,1988-10-01,Coahuila de Zaragoza,Femenino
María José,38,1985-04-25,Aguascalientes,Femenino
