# UDF

As UDFs (User-Defined Functions) no PySpark são funções personalizadas que você pode criar para aplicar operações complexas a colunas em um DataFrame. Elas permitem estender a funcionalidade do PySpark para realizar transformações de dados personalizadas.

Aqui está um exemplo de como criar e usar uma UDF no PySpark:

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Inicialize uma sessão Spark
spark = SparkSession.builder.appName("ExemploUDF").getOrCreate()

# Crie um DataFrame de exemplo
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Nome", "Idade"])

# Defina uma função Python que será usada como UDF
def dobrar_idade(idade):
    return idade * 2

# Registre a função Python como uma UDF
dobrar_idade_udf = udf(dobrar_idade, IntegerType())

# Aplique a UDF à coluna 'Idade' e crie uma nova coluna 'IdadeDobrada'
df = df.withColumn("IdadeDobrada", dobrar_idade_udf(df["Idade"]))

# Mostre o DataFrame resultante
df.show()
```

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("Exemplo").config("spark.jars.packages", "org.postgresql:postgresql:42.2.24").getOrCreate()

In [4]:
%%sh
pip install unidecode

Collecting unidecode
  Downloading Unidecode-1.3.7-py3-none-any.whl (235 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.7


In [2]:
df = (
    spark
    .read
    .format("parquet")
    .load("/home/app/data/1.bronze/air_cia/")
)

df.printSchema()

root
 |-- razão_social: string (nullable = true)
 |-- icao_iata: string (nullable = true)
 |-- cnpj: string (nullable = true)
 |-- atividades_aéreas: string (nullable = true)
 |-- endereço_sede: string (nullable = true)
 |-- telefone: string (nullable = true)
 |-- e-mail: string (nullable = true)
 |-- decisão_operacional: string (nullable = true)
 |-- data_decisão_operacional: string (nullable = true)
 |-- validade_operacional: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- iata: string (nullable = true)



In [5]:
from unidecode import unidecode

air_cia = (
    df
    .toDF(*[unidecode(a) for a in df.columns])
)

air_cia.printSchema()

root
 |-- razao_social: string (nullable = true)
 |-- icao_iata: string (nullable = true)
 |-- cnpj: string (nullable = true)
 |-- atividades_aereas: string (nullable = true)
 |-- endereco_sede: string (nullable = true)
 |-- telefone: string (nullable = true)
 |-- e-mail: string (nullable = true)
 |-- decisao_operacional: string (nullable = true)
 |-- data_decisao_operacional: string (nullable = true)
 |-- validade_operacional: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- iata: string (nullable = true)



In [12]:
(
    air_cia
    .write
    .format("parquet")
    .mode("append")
    .save("/home/app/data/1.bronze/air_cia/")
)

Py4JJavaError: An error occurred while calling o75.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:288)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 5.0 failed 1 times, most recent failure: Lost task 2.0 in stage 5.0 (TID 15) (32dedbe0f239 executor driver): java.io.FileNotFoundException: 
File file:/home/app/data/1.bronze/air_cia/part-00002-2583d41e-419f-4114-bd9f-f673582a0c02-c000.snappy.parquet does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:561)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:323)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:255)
	... 41 more
Caused by: java.io.FileNotFoundException: 
File file:/home/app/data/1.bronze/air_cia/part-00002-2583d41e-419f-4114-bd9f-f673582a0c02-c000.snappy.parquet does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:561)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:323)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$22(FileFormatWriter.scala:266)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [5]:
 def remove_caracters_special(string):
    if string == None:
        return None
    else:
        return unidecode(string)

In [6]:
from pyspark.sql.types import StringType

convert_name_udf = F.udf(remove_caracters_special, StringType())

# Udf com decorator

In [9]:
from pyspark.sql.functions import *

@F.udf (returnType = StringType())
def cnpj_transforming(string):
    if string:
        return string.replace('.','').replace('/','').replace('-','')
    else:
        return None

@udf (returnType = StringType())
def telefone_transforming(string):
    if string:
        return string.replace('(','').replace(')','').replace(' ','').replace('-','').replace("|Fax:","/")
    else:
        return None

air_cia_final =(
    air_cia
    .withColumn("cnpj", cnpj_transforming(F.col("cnpj")))
    .withColumn("telefone", F.split(telefone_transforming(F.col('telefone')),"/"))
    .withColumn("endereco_sede", convert_name_udf(F.col('endereco_sede')))
    .withColumn("decisao_operacional", convert_name_udf(F.col('decisao_operacional')))
    .withColumn("atividades_aereas", convert_name_udf(F.col('atividades_aereas')))
)
air_cia_final.show(1, vertical=True, truncate=False)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------
 razao_social             | ABSA - AEROLINHAS BRASILEIRAS S.A.                                                                                           
 icao_iata                | LTG M3                                                                                                                       
 cnpj                     | 00074635000133                                                                                                               
 atividades_aereas        | TRANSPORTE AEREO REGULAR                                                                                                     
 endereco_sede            | AEROPORTO INTERNACIONAL DE VIRACOPOS, RODOVIA SANTOS DUMONT, KM 66, SISTEMA VIARIO PRINCIPAL, S/ No, 13.052-970, CAMPINAS-SP 
 telefone                 | [1155828055]                                    

In [10]:
air_cia_final = air_cia_final.drop_duplicates(["razao_social","icao_iata","cnpj"])
air_cia_final = air_cia_final.dropna()

In [11]:
air_cia_final.dtypes

[('razao_social', 'string'),
 ('icao_iata', 'string'),
 ('cnpj', 'string'),
 ('atividades_aereas', 'string'),
 ('endereco_sede', 'string'),
 ('telefone', 'array<string>'),
 ('e-mail', 'string'),
 ('decisao_operacional', 'string'),
 ('data_decisao_operacional', 'string'),
 ('validade_operacional', 'string'),
 ('icao', 'string'),
 ('iata', 'string')]

In [15]:
air_cia_final.where(F.col("data_decisao_operacional") == '22/04/2015' ).show()

+--------------------+---------+--------------+--------------------+--------------------+------------+--------------+-------------------+------------------------+--------------------+----+----+
|        razao_social|icao_iata|          cnpj|   atividades_aereas|       endereco_sede|    telefone|        e-mail|decisao_operacional|data_decisao_operacional|validade_operacional|icao|iata|
+--------------------+---------+--------------+--------------------+--------------------+------------+--------------+-------------------+------------------------+--------------------+----+----+
|ABSA - AEROLINHAS...|   LTG M3|00074635000133|TRANSPORTE AEREO ...|AEROPORTO INTERNA...|[1155828055]|gar@tam.com.br|      DECISAO No 41|              22/04/2015|          23/04/2025| LTG|  M3|
+--------------------+---------+--------------+--------------------+--------------------+------------+--------------+-------------------+------------------------+--------------------+----+----+

