**Instalar JDK, SPARK, and HADOOP3+Scala**

In [1]:
!apt-get install openjdk-17-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3-scala2.13.tgz

In [3]:
# Descomprimir
!tar -xf spark-3.5.0-bin-hadoop3-scala2.13.tgz

In [4]:
!ls -af

..	 .				    spark-3.5.0-bin-hadoop3-scala2.13.tgz
.config  spark-3.5.0-bin-hadoop3-scala2.13  sample_data


In [5]:
# Variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.5.0-bin-hadoop3-scala2.13'

In [6]:
# Instalar findspark
!pip install -q findspark

**Librerias**

In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession

**Crear spark session**

In [9]:
spark = SparkSession.builder.appName('firstSession')\
                    .config('spark.master', 'local[4]')\
                    .config('spark.shuffle.sql.partitions', 1)\
                    .getOrCreate()

**Crear tabla a partir de una lista**

In [11]:
columnas = ['id', 'nombre', 'edad']
lista = [
    (1, 'Pedro', 30),
    (2, 'Juan', 20),
    (3, 'Juliana', 25),
    (4, 'Nicol', 24),
    (5, 'Daniela', 28)
]

In [12]:
df_1 = spark.createDataFrame(lista, schema=columnas)

In [13]:
df_1.columns

['id', 'nombre', 'edad']

In [14]:
df_1.printSchema()

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: long (nullable = true)



In [15]:
df_1.show()

+---+-------+----+
| id| nombre|edad|
+---+-------+----+
|  1|  Pedro|  30|
|  2|   Juan|  20|
|  3|Juliana|  25|
|  4|  Nicol|  24|
|  5|Daniela|  28|
+---+-------+----+



In [16]:
df_1.describe().show()

+-------+------------------+-------+-----------------+
|summary|                id| nombre|             edad|
+-------+------------------+-------+-----------------+
|  count|                 5|      5|                5|
|   mean|               3.0|   NULL|             25.4|
| stddev|1.5811388300841898|   NULL|3.847076812334269|
|    min|                 1|Daniela|               20|
|    max|                 5|  Pedro|               30|
+-------+------------------+-------+-----------------+



In [17]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [18]:
schema_1 = StructType([
    StructField('id', IntegerType(), True),
    StructField('nombre', StringType(), True),
    StructField('edad', IntegerType(), True)
])

In [19]:
df_1 = spark.createDataFrame(lista, schema=schema_1)

In [20]:
df_1.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: integer (nullable = true)



**Crear tabla a partir de un CSV**

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [22]:
df = spark.read.csv("/content/drive/MyDrive/Cursos/Hadoop y Spark/Data/business.csv", sep=',', header=True)

In [24]:
df.count()

1936

In [27]:
df.show(10, truncate=False)

+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+------------------------+--------------------+--------------+--------------+--------------+
|Series_reference|Period |Data_value|Suppressed|STATUS|UNITS  |Magnitude|Subject                       |Group                         |Series_title_1          |Series_title_2      |Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+------------------------+--------------------+--------------+--------------+--------------+
|BDCQ.SF1AA2CA   |2016.06|1116.386  |NULL      |F     |Dollars|6        |Business Data Collection - BDC|Industry by financial variable|Sales (operating income)|Forestry and Logging|Current prices|Unadjusted    |NULL          |
|BDCQ.SF1AA2CA   |2016.09|1070.874  |NULL      |F     |Dollars|6        |Business Data Colle

In [28]:
df.printSchema()

root
 |-- Series_reference: string (nullable = true)
 |-- Period: string (nullable = true)
 |-- Data_value: string (nullable = true)
 |-- Suppressed: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Series_title_1: string (nullable = true)
 |-- Series_title_2: string (nullable = true)
 |-- Series_title_3: string (nullable = true)
 |-- Series_title_4: string (nullable = true)
 |-- Series_title_5: string (nullable = true)



**Guardar dataframe**

In [29]:
df.write.parquet("/content/drive/MyDrive/Cursos/Hadoop y Spark/Data/parquet_example", mode='overwrite')

**Crear tabla a partir de un parquet**

In [11]:
df_p = spark.read.parquet("/content/drive/MyDrive/Cursos/Hadoop y Spark/Data/parquet_example")

In [12]:
df_p.printSchema()

root
 |-- Series_reference: string (nullable = true)
 |-- Period: string (nullable = true)
 |-- Data_value: string (nullable = true)
 |-- Suppressed: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Series_title_1: string (nullable = true)
 |-- Series_title_2: string (nullable = true)
 |-- Series_title_3: string (nullable = true)
 |-- Series_title_4: string (nullable = true)
 |-- Series_title_5: string (nullable = true)



In [13]:
df_p.show(10, truncate=False)

+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+------------------------+--------------------+--------------+--------------+--------------+
|Series_reference|Period |Data_value|Suppressed|STATUS|UNITS  |Magnitude|Subject                       |Group                         |Series_title_1          |Series_title_2      |Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+------------------------+--------------------+--------------+--------------+--------------+
|BDCQ.SF1AA2CA   |2016.06|1116.386  |NULL      |F     |Dollars|6        |Business Data Collection - BDC|Industry by financial variable|Sales (operating income)|Forestry and Logging|Current prices|Unadjusted    |NULL          |
|BDCQ.SF1AA2CA   |2016.09|1070.874  |NULL      |F     |Dollars|6        |Business Data Colle

In [14]:
df_p.describe().show()

+-------+----------------+------------------+------------------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|summary|Series_reference|            Period|        Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+-------+----------------+------------------+------------------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|  count|            1936|              1936|              1936|         0|  1936|   1936|     1936|                1936|                1936|                1936|                1936|          1936|          1936|             0|
|   mean|            NULL| 2018.217975206615|2704.3055568181853|      NULL|  NUL

In [17]:
grouped_df = df_p.groupBy("Series_title_2").agg({"Series_title_2": "count"})
grouped_df.show(truncate=False)

+---------------------------------------------------------------------------+---------------------+
|Series_title_2                                                             |count(Series_title_2)|
+---------------------------------------------------------------------------+---------------------+
|Other Services                                                             |76                   |
|Food, Beverage and Tobacco Product Manufacturing                           |76                   |
|Mining                                                                     |76                   |
|Administrative and Support Services                                        |76                   |
|Arts and Recreation Services                                               |76                   |
|Printing                                                                   |76                   |
|Fishing, Aquaculture and Agriculture, Forestry and Fishing Support Services|76                   |


In [18]:
filtered_df = df_p.filter(df_p["Series_title_2"] == "Other Services")
filtered_df.show(truncate=False)

+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+-----------------------------------+--------------+--------------+--------------+--------------+
|Series_reference|Period |Data_value|Suppressed|STATUS|UNITS  |Magnitude|Subject                       |Group                         |Series_title_1                     |Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+-----------------------------------+--------------+--------------+--------------+--------------+
|BDCQ.SF1RS2CA   |2016.06|2024.111  |NULL      |F     |Dollars|6        |Business Data Collection - BDC|Industry by financial variable|Sales (operating income)           |Other Services|Current prices|Unadjusted    |NULL          |
|BDCQ.SF1RS2CA   |2016.09|2130.915  |NULL      |F     |Dollars|6        

In [19]:
selected_df = df_p.select('Period', 'Data_value', 'Subject', 'Group')
selected_df.show(truncate=False)

+-------+----------+------------------------------+------------------------------+
|Period |Data_value|Subject                       |Group                         |
+-------+----------+------------------------------+------------------------------+
|2016.06|1116.386  |Business Data Collection - BDC|Industry by financial variable|
|2016.09|1070.874  |Business Data Collection - BDC|Industry by financial variable|
|2016.12|1054.408  |Business Data Collection - BDC|Industry by financial variable|
|2017.03|1010.665  |Business Data Collection - BDC|Industry by financial variable|
|2017.06|1233.7    |Business Data Collection - BDC|Industry by financial variable|
|2017.09|1282.436  |Business Data Collection - BDC|Industry by financial variable|
|2017.12|1290.82   |Business Data Collection - BDC|Industry by financial variable|
|2018.03|1412.007  |Business Data Collection - BDC|Industry by financial variable|
|2018.06|1488.055  |Business Data Collection - BDC|Industry by financial variable|
|201

In [28]:
sorted_df = df_p.sort("Period")
sorted_df.show(truncate=False)

+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+-----------------------------------+---------------------------------------------------------------------------+--------------+--------------+--------------+
|Series_reference|Period |Data_value|Suppressed|STATUS|UNITS  |Magnitude|Subject                       |Group                         |Series_title_1                     |Series_title_2                                                             |Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+-----------------------------------+---------------------------------------------------------------------------+--------------+--------------+--------------+
|BDCQ.SF1EE1CA   |2016.06|13386.067 |NULL      |F     |Dollars|6        |Business Data Collection - BDC|Industry by finan

In [23]:
# Crear vista temporal a partir del dataframe
df_p.createOrReplaceTempView("business_tbl")

In [26]:
result = spark.sql("SELECT Period, Data_value, Subject, Group, Series_title_2 FROM business_tbl WHERE Series_title_2 = 'Other Services'")
result.show(truncate=False)

+-------+----------+------------------------------+------------------------------+--------------+
|Period |Data_value|Subject                       |Group                         |Series_title_2|
+-------+----------+------------------------------+------------------------------+--------------+
|2016.06|2024.111  |Business Data Collection - BDC|Industry by financial variable|Other Services|
|2016.09|2130.915  |Business Data Collection - BDC|Industry by financial variable|Other Services|
|2016.12|2212.397  |Business Data Collection - BDC|Industry by financial variable|Other Services|
|2017.03|2106.601  |Business Data Collection - BDC|Industry by financial variable|Other Services|
|2017.06|2214.264  |Business Data Collection - BDC|Industry by financial variable|Other Services|
|2017.09|2272.563  |Business Data Collection - BDC|Industry by financial variable|Other Services|
|2017.12|2304.067  |Business Data Collection - BDC|Industry by financial variable|Other Services|
|2018.03|2159.722  |

In [27]:
result = spark.sql("SELECT Subject, Group, Series_title_2, COUNT(*) as Conteo FROM business_tbl GROUP BY Subject, Group, Series_title_2")
result.show(truncate=False)

+------------------------------+------------------------------+---------------------------------------------------------------------------+------+
|Subject                       |Group                         |Series_title_2                                                             |Conteo|
+------------------------------+------------------------------+---------------------------------------------------------------------------+------+
|Business Data Collection - BDC|Industry by financial variable|Printing                                                                   |76    |
|Business Data Collection - BDC|Industry by financial variable|Transport, Postal and Warehousing                                          |76    |
|Business Data Collection - BDC|Industry by financial variable|Transport Equipment, Machinery and Equipment Manufacturing                 |76    |
|Business Data Collection - BDC|Industry by financial variable|Furniture and Other Manufacturing                      

In [30]:
result = spark.sql("SELECT Period, COUNT(*) as Conteo FROM business_tbl GROUP BY Period ORDER BY Period")
result.show(truncate=False)

+-------+------+
|Period |Conteo|
+-------+------+
|2016.06|96    |
|2016.09|96    |
|2016.12|96    |
|2017.03|96    |
|2017.06|96    |
|2017.09|104   |
|2017.12|104   |
|2018.03|104   |
|2018.06|104   |
|2018.09|104   |
|2018.12|104   |
|2019.03|104   |
|2019.06|104   |
|2019.09|104   |
|2019.12|104   |
|2020.03|104   |
|2020.06|104   |
|2020.09|104   |
|2020.12|104   |
+-------+------+



**Crear tabla a partir de un JSON**

In [40]:
json_df = spark.read.json("/content/drive/MyDrive/Cursos/Hadoop y Spark/Data/example.json", multiLine=True)

In [41]:
json_df.printSchema()

root
 |-- bio: string (nullable = true)
 |-- id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- name: string (nullable = true)
 |-- version: double (nullable = true)



In [43]:
selected_df = json_df.select('id', 'language', 'name', 'version')
selected_df.show(truncate=False)

+----------------+----------------+-----------------+-------+
|id              |language        |name             |version|
+----------------+----------------+-----------------+-------+
|V59OF92YF627HFY0|Sindhi          |Adeel Solangi    |6.1    |
|ENTOCR13RSCLZ6KU|Sindhi          |Afzal Ghaffar    |1.88   |
|IAKPO3R4761JDRVG|Sindhi          |Aamir Solangi    |7.27   |
|5ZVOEPMJUI4MB4EN|Uyghur          |Abla Dilmurat    |2.53   |
|6VTI8X6LL0MMPJCC|Uyghur          |Adil Eli         |6.49   |
|F2KEU5L7EHYSYFTT|Uyghur          |Adile Qadir      |1.9    |
|LO6DVTZLRK68528I|Uyghur          |Abdukerim Ibrahim|5.9    |
|LJRIULRNJFCNZJAJ|Sindhi          |Adil Abro        |9.32   |
|JMCL0CXNXHPL1GBC|Galician        |Afonso Vilarchán |5.21   |
|KU4T500C830697CW|Maltese         |Mark Schembri    |3.17   |
|XOF91ZR7MHV1TXRS|Galician        |Antía Sixirei    |6.44   |
|FTSNV411G5MKLPDT|Uyghur          |Aygul Mutellip   |9.1    |
|OJMWMEEQWMLDU29P|Sindhi          |Awais Shaikh     |1.59   |
|5G646V7

In [44]:
# Crear vista temporal a partir del dataframe
json_df.createOrReplaceTempView("example_tbl")

In [45]:
result = spark.sql("SELECT language, COUNT(*) as Conteo FROM example_tbl GROUP BY language ORDER BY language")
result.show(truncate=False)

+----------------+------+
|language        |Conteo|
+----------------+------+
|Bosnian         |1120  |
|Galician        |1520  |
|Hindi           |3340  |
|Icelandic       |1280  |
|Maltese         |1600  |
|Sesotho sa Leboa|1600  |
|Setswana        |1280  |
|Sindhi          |1520  |
|Uyghur          |1600  |
|isiZulu         |980   |
+----------------+------+



In [46]:
result = spark.sql("SELECT id, language, name, version FROM example_tbl WHERE name LIKE 'A%'")
result.show(truncate=False)

+----------------+--------+-----------------+-------+
|id              |language|name             |version|
+----------------+--------+-----------------+-------+
|V59OF92YF627HFY0|Sindhi  |Adeel Solangi    |6.1    |
|ENTOCR13RSCLZ6KU|Sindhi  |Afzal Ghaffar    |1.88   |
|IAKPO3R4761JDRVG|Sindhi  |Aamir Solangi    |7.27   |
|5ZVOEPMJUI4MB4EN|Uyghur  |Abla Dilmurat    |2.53   |
|6VTI8X6LL0MMPJCC|Uyghur  |Adil Eli         |6.49   |
|F2KEU5L7EHYSYFTT|Uyghur  |Adile Qadir      |1.9    |
|LO6DVTZLRK68528I|Uyghur  |Abdukerim Ibrahim|5.9    |
|LJRIULRNJFCNZJAJ|Sindhi  |Adil Abro        |9.32   |
|JMCL0CXNXHPL1GBC|Galician|Afonso Vilarchán |5.21   |
|XOF91ZR7MHV1TXRS|Galician|Antía Sixirei    |6.44   |
|FTSNV411G5MKLPDT|Uyghur  |Aygul Mutellip   |9.1    |
|OJMWMEEQWMLDU29P|Sindhi  |Awais Shaikh     |1.59   |
|5G646V7E6TJW8X2M|Sindhi  |Ambreen Ahmed    |2.35   |
|70RODUVRD95CLOJL|Uyghur  |Aytürk Qasim     |1.32   |
|5RCTVD3C5QGVAKTQ|Uyghur  |Azrugul Osman    |3.18   |
|3WNLUZ5LT2F7MYVU|Sindhi  |A

In [49]:
result = spark.sql("SELECT language, MAX(version) as Maximo, MIN(version) as Minimo FROM example_tbl GROUP BY language")
result.show(truncate=False)

+----------------+------+------+
|language        |Maximo|Minimo|
+----------------+------+------+
|Maltese         |9.97  |2.09  |
|Icelandic       |9.59  |2.09  |
|isiZulu         |9.37  |1.18  |
|Uyghur          |9.1   |1.01  |
|Bosnian         |9.56  |1.06  |
|Sindhi          |9.48  |1.4   |
|Setswana        |9.07  |1.07  |
|Hindi           |9.99  |1.08  |
|Sesotho sa Leboa|9.55  |1.88  |
|Galician        |9.33  |1.08  |
+----------------+------+------+



In [50]:
# Detener spark session
spark.stop()