# Spark

# Librerias

In [22]:
# general
from datetime import datetime, date
import pandas as pd
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# etl
import pyspark 
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row
from pyspark.sql import Column
from pyspark.sql.functions import upper
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import expr

from pyspark.sql import functions as func
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, StringType



In [23]:
# crear sesion de Spark
session = SparkSession.builder.master('local').appName('app1').getOrCreate()  
session

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [8]:
# iniciar sesion de spark
#spark = SparkSession.builder.getOrCreate()
#spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
#spark

In [2]:
install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]
install_requires

['pyspark=={site.SPARK_VERSION}']

# Intro

In [39]:

# definir esquema de DataFrame
# definir tipos de variables
myschema = StructType([\
                       StructField("userID", IntegerType(), True),
                       StructField("name", StringType(), True),
                       StructField("age",IntegerType(), True),
                       StructField("friends",IntegerType(), True),
                        ])
myschema

StructType([StructField('userID', IntegerType(), True), StructField('name', StringType(), True), StructField('age', IntegerType(), True), StructField('friends', IntegerType(), True)])

In [40]:

# crear DataFrame a partir de .csv
data = session.read.format("csv")\
    .schema(myschema)\
    .option("path","data/fakefriends.csv")\
    .load()
    
data

DataFrame[userID: int, name: string, age: int, friends: int]

In [11]:
# aplicar transformaciones

# seleccionar columnas: userID, name, age, friends
# condicion: age <30
# ordenar por: userID
data_trans = data.select(data.userID,data.name,data.age,data.friends)\
         .where(data.age < 30).withColumn('insert_ts', func.current_timestamp())\
         .orderBy(data.userID)

data_trans

DataFrame[userID: int, name: string, age: int, friends: int, insert_ts: timestamp]

In [12]:
# contar filas de salida
data_trans.count()


112

In [13]:
# crear Temp View para visualizar datos
data_trans.createOrReplaceTempView('data')


In [14]:
# ejecutar query de Spark SQL
session.sql('select name, age, friends, insert_ts from data').show()

+--------+---+-------+--------------------+
|    name|age|friends|           insert_ts|
+--------+---+-------+--------------------+
|Jean-Luc| 26|      2|2024-02-16 11:09:...|
|    Hugh| 27|    181|2024-02-16 11:09:...|
|  Weyoun| 22|    323|2024-02-16 11:09:...|
|   Miles| 19|    268|2024-02-16 11:09:...|
|  Julian| 25|      1|2024-02-16 11:09:...|
|     Ben| 21|    445|2024-02-16 11:09:...|
|  Julian| 22|    100|2024-02-16 11:09:...|
|     Nog| 26|    281|2024-02-16 11:09:...|
| Beverly| 27|    305|2024-02-16 11:09:...|
|    Morn| 25|     96|2024-02-16 11:09:...|
|   Brunt| 24|     49|2024-02-16 11:09:...|
|     Nog| 20|      1|2024-02-16 11:09:...|
| Beverly| 19|    269|2024-02-16 11:09:...|
|   Brunt| 19|      5|2024-02-16 11:09:...|
|  Geordi| 20|    100|2024-02-16 11:09:...|
|  Geordi| 21|    477|2024-02-16 11:09:...|
|  Kasidy| 22|    179|2024-02-16 11:09:...|
|   Brunt| 20|    384|2024-02-16 11:09:...|
|     Ben| 28|    311|2024-02-16 11:09:...|
|    Worf| 24|    492|2024-02-16

# DataFrames y Datasets

In [27]:
# cargar base de datos comom DataFrame
data1 = session.read.format('csv').\
option('inferSchema','true').\
option('header', 'true').\
option('path','data/operations_management.csv').\
load()

data1

DataFrame[description: string, industry: string, level: int, size: string, line_code: string, value: int]

In [28]:
data1.printSchema() # ver esquema de DataFrame

root
 |-- description: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- level: integer (nullable = true)
 |-- size: string (nullable = true)
 |-- line_code: string (nullable = true)
 |-- value: integer (nullable = true)



In [29]:
# aplicar transformaciones
# seleccionar columnas: industry, value
# filtrar columnas: condicion (value>200 & industry!='total)
# ordenar por: value descendiente
data1_trans = data1.select("industry","value").\
                    filter((col("value") > 200) & (col("industry") != "total")).\
                    orderBy(desc("value"))
data1_trans

DataFrame[industry: string, value: int]

In [30]:
data1_trans.printSchema() # ver esquema de DataFrame transformado

root
 |-- industry: string (nullable = true)
 |-- value: integer (nullable = true)



In [31]:
data1_trans.show() # ver DataFrame

+--------------------+-----+
|            industry|value|
+--------------------+-----+
|        Construction| 6030|
|        Construction| 5904|
|        Construction| 5229|
|Accommodation & f...| 5058|
|        Construction| 4965|
|        Construction| 4959|
|Accommodation & f...| 4950|
|        Construction| 4686|
|        Construction| 4668|
|        Construction| 4665|
|       Manufacturing| 4662|
|       Manufacturing| 4632|
|        Construction| 4575|
|        Construction| 4566|
|Professional, sci...| 4476|
|Professional, sci...| 4470|
|        Retail trade| 4434|
|        Retail trade| 4434|
|Accommodation & f...| 4251|
|Accommodation & f...| 4176|
+--------------------+-----+
only showing top 20 rows



In [34]:
# crear vista
data1.createOrReplaceTempView("data1")


In [35]:
# ejecutar query sql y ver tabla
data_2= session.sql("""SELECT * FROM data1""").show()

+--------------------+--------------------+-----+---------------+---------+-----+
|         description|            industry|level|           size|line_code|value|
+--------------------+--------------------+-----+---------------+---------+-----+
|Awareness of clim...|               total|    0| 6–19 employees| C0300.01|13080|
|Awareness of clim...|               total|    0|20–49 employees| C0300.01| 3348|
|Awareness of clim...|               total|    0|50–99 employees| C0300.01| 1089|
|Awareness of clim...|               total|    0| 100+ employees| C0300.01| 1023|
|Awareness of clim...|Agriculture, fore...|    1|          total| C0300.01| 2364|
|Awareness of clim...|         Agriculture|    2|          total| C0300.01| 1683|
|Awareness of clim...|  Commercial fishing|    2|          total| C0300.01|   27|
|Awareness of clim...|  Forestry & logging|    2|          total| C0300.01|  126|
|Awareness of clim...|Agriculture, fore...|    2|          total| C0300.01|  528|
|Awareness of cl

In [36]:
# lista de bases de datos
session.catalog.listDatabases()


[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/C:/Users/dfoso/Documents/GitHub/tech_skills/etl/spark-warehouse')]

In [37]:
# lista de tablas
session.catalog.listTables()

[Table(name='data', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='data1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [55]:
# definir esquema de DataFrame
# definir tipos de variables
myschema = StructType([\
                       StructField("userID", IntegerType(), True),
                       StructField("name", StringType(), True),
                       StructField("age",IntegerType(), True),
                       StructField("friends",IntegerType(), True),
                        ])
myschema

StructType([StructField('userID', IntegerType(), True), StructField('name', StringType(), True), StructField('age', IntegerType(), True), StructField('friends', IntegerType(), True)])

In [56]:
# crear DataFrame a partir de .csv
data = session.read.format("csv")\
    .schema(myschema)\
    .option("path","data/fakefriends.csv")\
    .load()
    
data.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



In [53]:
data.createOrReplaceTempView('view')

In [54]:
# guardar DataFrame como .csv
data.write\
    .format('csv').mode('overwrite')\
    .option('path', 'data/fakefriends_trans.csv')\
    .partitionBy('age')\
    .save()



Py4JJavaError: An error occurred while calling o283.save.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)



# Managed vs Unmanaged Tables

In [3]:
# crear PySpark DataFrame con lista de filas
data = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
data

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [4]:
# crear PySpark DataFrame con esquema explicito
data = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
data


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [5]:
# crear PySpark DataFrame de pandas DataFrame
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
data = spark.createDataFrame(pandas_df)
data

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [8]:
data.show

<bound method DataFrame.show of DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]>

In [9]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]>

In [10]:
# convertir PySpark DataFrame a Pandas DataFrame
data.a

Column<'a'>

In [12]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [13]:
data

Py4JJavaError: An error occurred while calling o89.htmlString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.htmlString(Dataset.scala:409)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [7]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

In [9]:
# crear Spark DataFrame 
data = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
data

DataFrame[color: string, fruit: string, v1: bigint, v2: bigint]

In [16]:
# crear Dataframe de archivo de texto
data_text = spark.read.text("data/buddhist.txt")
data_text

DataFrame[value: string]

In [18]:
data_text.count() # contar el numero de filas
data_text.first() # ver primera fila

Row(value='Buddhist universities and colleges in the United States')

In [19]:
# filtrar datos con patron
linesWithSpark = data_text.filter(data_text.value.contains("Spark"))
linesWithSpark

DataFrame[value: string]

In [21]:
# ver fila con mas palabras
data_text.select(sf.size(sf.split(data_text.value,"\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()

[Row(max(numWords)=41)]

In [22]:
linesWithSpark.cache()

DataFrame[value: string]

In [23]:
linesWithSpark.count() # contar filas con patron

0

In [24]:
# crear aplicacion de Spark simple 

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/c:/Users/dfoso/Documents/GitHub/tech_skills/etl/YOUR_SPARK_HOME/README.md.

# Spark SQL

## Intro

### Sesion

In [5]:
# crear sesion de spark
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Crear DataFrames

In [6]:

# crear DataFrame de archivo .json
df = spark.read.json('spark/examples/src/main/resources/people.json')
# Displays the content of the DataFrame to stdout
df.show()


+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
# ver esquema de DataFrame
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [8]:

# seleccionar columna   name
# mostrar columna
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [12]:
# seleccionar columna name, age
# agregar 1 a columna  age
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     NULL|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [13]:
# filtrar por condicion:  age > 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [15]:
# agrupar por:  age
# contar observaciones
df.groupBy("age").count().show()


+----+-----+
| age|count|
+----+-----+
|  19|    1|
|NULL|    1|
|  30|    1|
+----+-----+



In [16]:
# registar DataFrame en vista temporal
df.createOrReplaceTempView("people")


### Ejecutar Queries de SQL Programaticas

In [17]:
# seleccionar todas las columnas de vista   people 
sqlDF = spark.sql("SELECT * FROM people")
# mostrar DataFrame SQL
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Vista Global

In [18]:
# registar DataFrame en vista global de SQL
df.createGlobalTempView("people")


In [19]:
# seleccionar todas las columnas de vista global
# mostrar DataFrame SQL
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [20]:
# ver vista global desde nueva sesion de Spark
# seleccionar todas las columnas de vista global
# mostrar DataFrame SQL
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [22]:
# crear Spark Context
sc = spark.sparkContext
sc

In [27]:
# cargar archivo de texto
# convertir cada linea de texto en fila de DataFrame
lines = sc.textFile("spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(",")) # split lines por patron ','
people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # extraer personas


PythonRDD[61] at RDD at PythonRDD.scala:53

In [30]:
# inferir esquema de DataFrame
# crear vista temporal  people
#schemaPeople = spark.createDataFrame(people)
#schemaPeople.createOrReplaceTempView("people")
#schemaPeople

### Especificar Esquema

In [48]:
# crear Spark Context
sc = spark.sparkContext
sc

In [49]:
# cargar archivo de texto 
# convertir lines en tuplas
lines = sc.textFile("spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(",")) # split lineas por patron ','
people = parts.map(lambda p: (p[0], p[1].strip()))

In [50]:
# codificar esquema en  string
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [51]:
# aplicar esquema codificado a RDD 
schemaPeople = spark.createDataFrame(people, schema)
schemaPeople

DataFrame[name: string, age: string]

In [52]:
# registrar vista temporal  people
schemaPeople.createOrReplaceTempView("people1")

In [53]:
# ejecutar query sql sobre DataFrame registrada 
#results = spark.sql("SELECT name FROM people1")
#results.show()

## Fuentes de Datos

### Funciones Simples para Cargar/Guardar Archivos

In [7]:
# el formato default en Spark es .parquet
# cargar achivo parquet como DataFrame
df = spark.read.load("spark/examples/src/main/resources/users.parquet")

# seleccionar columnas:  name, favorite_color
# guardar achivo parquet:   namesAndFavColors.parquet
#df.select("name", "favorite_color").write.save("spark/examples/src/main/resources/namesAndFavColors.parquet")

In [8]:
# cargar achivo .json
# especificar formato
df = spark.read.load("spark/examples/src/main/resources/people.json", format="json")

# guardar DataFrame como archivo .parquet
#df.select("name", "age").write.save("spark/examples/src/main/resources/namesAndAges.parquet", format="parquet")

In [9]:
# cargar achivo .json
# especificar formato, separador
# inferir esquema
# incluir encabezado
df = spark.read.load("spark/examples/src/main/resources/people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")


In [10]:
# cargar archivo .orc
df = spark.read.orc("spark/examples/src/main/resources/users.orc")
"""
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .option("orc.column.encoding.direct", "name")
    .save("users_with_options.orc"))
    """


'\n(df.write.format("orc")\n    .option("orc.bloom.filter.columns", "favorite_color")\n    .option("orc.dictionary.key.threshold", "1.0")\n    .option("orc.column.encoding.direct", "name")\n    .save("users_with_options.orc"))\n    '

In [11]:
# ejecutar queries de SQL directamente sobre archivo cargado
df = spark.sql("SELECT * FROM parquet.`spark/examples/src/main/resources/users.parquet`")
df

DataFrame[name: string, favorite_color: string, favorite_numbers: array<int>]

In [13]:
# df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

### Archivos Parquet

In [21]:
# cargar archivo .json
peopleDF = spark.read.json("spark/examples/src/main/resources/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")



Py4JJavaError: An error occurred while calling o118.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
