<H1><span style="color:orange"> Spark SQL and DataFrames: Introduction to Built-in Data Sources.</span>
</H1>

## Chapter 4 - Learning Spark Lightning-Fast Data Analytics

### **Santiago Jejen Salinas**

***Comunidad de Apache spark Latam***

*12/05/2021*
<img class="izquierda" src="img/apache.png">
<img class="izquierda" src="img/book.png">
     



![Write Read Data Json](img/SparkSql.png)

# Indice
1. [**Using Spark SQL in Spark Applications**](#id1)
    
2. [**SQL Tables and Views**](#id2)

    2.1 [Creating SQL Databases and Tables](#id2.1)
    
    2.2 [Creating Views](#id2.2)
    
    2.3 [Viewing the Metadata](#id2.3)
    
3. [**Data Sources for DataFrames and SQL Tables**](#id3)

    3.1 [DataFrameReader](#id3.1)
    
    3.2 [DataFrameWritter](#id3.2)
    
    3.3 [JSON](#id3.3)
    
    3.4 [CSV](#id3.4)
    
    3.5 [Images](#id3.5)


```from pyspark.sql import SparkSession```

Create a SparkSession

```spark = SparkSession.builder.appName("SparkSQLExampleApp") .getOrCreate()```

Path to data set 

```csv_file = /databricks-datasets/learning-spark-v2/flights/departuredelays.csv"```


- Read and create a temporary view
- Infer schema (note that for larger files you
- may want to specify the schema)

```df = (spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(csv_file))```
```df.createOrReplaceTempView("us_delay_flights_tbl")```




```schema = "`date` STRING, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"```

In [92]:
from pyspark.sql import SparkSession 
spark = (SparkSession
 .builder
 .appName("SparkSQLExampleApp")
 .getOrCreate())


## Using Spark SQL in Spark Applications <a name="id1"></a>

In [2]:
csv_file = "/home/jovyan/work/Data/colombia_imports.csv"


In [3]:
df = (spark.read.format("csv")
 .option("inferSchema", "true")
 .option("header", "true")
 .option("sep", "|")
 .load(csv_file))
df.createOrReplaceTempView("colombia_imports_tbl")

In [4]:
df.show(10)

+---+---------+----+------+----+-----+----------+
|_c0|id_import|year|origin|dest|sitc4|export_val|
+---+---------+----+------+----+-----+----------+
|  0|    11564|1962|   civ| col|  712|   13000.0|
|  2|    17741|1962|   cod| col| 2925|    2000.0|
|  3|    20216|1962|   cog| col| 2925|    2000.0|
|  6|    29089|1962|   egy| col| 2631|   40000.0|
|  7|    32443|1962|   eth| col| 2922|    4000.0|
|  8|    47456|1962|   ken| col| 5320|    2000.0|
|  9|    47457|1962|   ken| col|  752|   15000.0|
| 10|    47458|1962|   ken| col| 2682|   22000.0|
| 14|    62938|1962|   mdg| col|  751|    1000.0|
| 15|    62939|1962|   mdg| col|  752|   37000.0|
+---+---------+----+------+----+-----+----------+
only showing top 10 rows



In [5]:
df.createOrReplaceTempView("colombia_imports")


In [6]:
spark.sql("""SELECT export_val, origin, dest
FROM colombia_imports WHERE export_val > 13000.0
ORDER BY export_val DESC""").show(10)

+--------------+------+----+
|    export_val|origin|dest|
+--------------+------+----+
|3.9901394405E9|   usa| col|
|1.9297295985E9|   chn| col|
| 1.424137562E9|   usa| col|
| 1.252119752E9|   usa| col|
|1.0759772505E9|   chn| col|
| 1.045398661E9|   usa| col|
|  1.01269083E9|   usa| col|
|  9.81976222E8|   usa| col|
|  9.79463934E8|   usa| col|
|   9.6256271E8|   fra| col|
+--------------+------+----+
only showing top 10 rows



In [7]:
spark.sql("""SELECT export_val, origin, dest
FROM colombia_imports
WHERE export_val > 13000.0 AND origin = 'mex' AND dest = 'col'
ORDER by export_val ASC""").show(10)


+----------+------+----+
|export_val|origin|dest|
+----------+------+----+
|   13001.5|   mex| col|
|   13021.0|   mex| col|
|   13040.0|   mex| col|
|   13046.0|   mex| col|
|   13048.0|   mex| col|
|   13051.0|   mex| col|
|   13051.0|   mex| col|
|   13082.0|   mex| col|
|   13085.0|   mex| col|
|   13088.0|   mex| col|
+----------+------+----+
only showing top 10 rows



In [8]:
from pyspark.sql.functions import col, desc

In [9]:
(df.select("export_val", "origin", "dest")
 .where(col("export_val") > 13000.0)
 .orderBy(desc("export_val"))).show(10)

+--------------+------+----+
|    export_val|origin|dest|
+--------------+------+----+
|3.9901394405E9|   usa| col|
|1.9297295985E9|   chn| col|
| 1.424137562E9|   usa| col|
| 1.252119752E9|   usa| col|
|1.0759772505E9|   chn| col|
| 1.045398661E9|   usa| col|
|  1.01269083E9|   usa| col|
|  9.81976222E8|   usa| col|
|  9.79463934E8|   usa| col|
|   9.6256271E8|   fra| col|
+--------------+------+----+
only showing top 10 rows



In [10]:
(df.select("export_val", "origin", "dest")
 .where("export_val > 13000.0")
 .orderBy("export_val",ascending=False).show(10))

+--------------+------+----+
|    export_val|origin|dest|
+--------------+------+----+
|3.9901394405E9|   usa| col|
|1.9297295985E9|   chn| col|
| 1.424137562E9|   usa| col|
| 1.252119752E9|   usa| col|
|1.0759772505E9|   chn| col|
| 1.045398661E9|   usa| col|
|  1.01269083E9|   usa| col|
|  9.81976222E8|   usa| col|
|  9.79463934E8|   usa| col|
|   9.6256271E8|   fra| col|
+--------------+------+----+
only showing top 10 rows



##  SQL Tables and Views<a name="id2"></a>


### Creating SQL Databases and Tables <a name="id2.1"></a>

Se pueden crear vistas con tablas existentes, estas pueden ser:

Global : Visibles en todas las sesiones de Spark
Temporal: Temporales, se eliminan cuando la sesión se cierra

In [11]:
spark.sql("CREATE DATABASE learn_spark_db")

DataFrame[]

In [12]:
spark.sql("USE learn_spark_db")


DataFrame[]

Ahora cada tabla generada por cada comando quedará en **learn_spark_db**

#### Creating a managed table

In [13]:
df.write.saveAsTable("managed_colombia_imports_tbl")

#### Creating an unmanaged table

In [14]:
df.write.option("path", "/tmp/data/colombia_imports").saveAsTable("colombia_imports_tbl")


### Creating Views<a name="id2.2"></a>

Se pueden crear vistas con tablas existentes, estas pueden ser:

Global : Visibles en todas las sesiones de Spark
Temporal: Temporales, se eliminan cuando la sesión se cierra

In [15]:
df_col_mex = spark.sql("SELECT id_import,year,origin,dest, export_val FROM colombia_imports_tbl WHERE origin = 'mex'")
df_col_mex.show(20)

+---------+----+------+----+----------+
|id_import|year|origin|dest|export_val|
+---------+----+------+----+----------+
| 88763538|2007|   mex| col|   41001.0|
| 88763539|2007|   mex| col|   15075.0|
| 88763540|2007|   mex| col| 1624579.0|
| 88763541|2007|   mex| col|  729561.0|
| 88763542|2007|   mex| col|  890847.0|
| 88763543|2007|   mex| col| 1141666.0|
| 88763544|2007|   mex| col| 2089748.0|
| 88763545|2007|   mex| col| 1118410.0|
| 88763546|2007|   mex| col|   55302.0|
| 88763547|2007|   mex| col|  885914.0|
| 88763548|2007|   mex| col|       1.0|
| 88763549|2007|   mex| col| 1608388.0|
| 88763550|2007|   mex| col|    4107.0|
| 88763551|2007|   mex| col|    8218.0|
| 88763552|2007|   mex| col| 4051575.0|
| 88763553|2007|   mex| col| 1429849.0|
| 88763554|2007|   mex| col|  115630.0|
| 88763555|2007|   mex| col|  568444.0|
| 88763556|2007|   mex| col|  487274.0|
| 88763557|2007|   mex| col|   32457.0|
+---------+----+------+----+----------+
only showing top 20 rows



In [16]:
df_col_usa = spark.sql("SELECT id_import,year,origin,dest, export_val FROM colombia_imports_tbl WHERE origin = 'usa'")
df_col_usa.show(20)

+---------+----+------+----+-----------+
|id_import|year|origin|dest| export_val|
+---------+----+------+----+-----------+
| 88916809|2007|   usa| col|  1319382.0|
| 88916810|2007|   usa| col|   275060.0|
| 88916811|2007|   usa| col|  3236603.0|
| 88916812|2007|   usa| col|  4897345.0|
| 88916813|2007|   usa| col|1.6200901E7|
| 88916814|2007|   usa| col|   343708.0|
| 88916815|2007|   usa| col|2.3701074E7|
| 88916816|2007|   usa| col|1.6073981E7|
| 88916817|2007|   usa| col|  2651315.0|
| 88916818|2007|   usa| col|   686890.0|
| 88916819|2007|   usa| col|  1533487.0|
| 88916820|2007|   usa| col|    95363.0|
| 88916821|2007|   usa| col|   571721.0|
| 88916822|2007|   usa| col|    32443.0|
| 88916823|2007|   usa| col|  4248498.0|
| 88916824|2007|   usa| col|   115508.0|
| 88916825|2007|   usa| col|1.5794855E7|
| 88916826|2007|   usa| col|2.1177728E7|
| 88916827|2007|   usa| col|  1616648.0|
| 88916828|2007|   usa| col|   409290.0|
+---------+----+------+----+-----------+
only showing top

In [17]:
# Create a global view

df_col_mex.createOrReplaceGlobalTempView("colombia_imports_origin_mexico_global_tmp_view")

# Create a temporary view
df_col_usa.createOrReplaceTempView("colombia_imports_origin_usa_tmp_view")

In [18]:
# Para tablas globales usar global_temp

spark.sql("""SELECT export_val, origin, dest
FROM global_temp.colombia_imports_origin_mexico_global_tmp_view
WHERE export_val > 13000.0""").show(10)

+----------+------+----+
|export_val|origin|dest|
+----------+------+----+
|   41001.0|   mex| col|
|   15075.0|   mex| col|
| 1624579.0|   mex| col|
|  729561.0|   mex| col|
|  890847.0|   mex| col|
| 1141666.0|   mex| col|
| 2089748.0|   mex| col|
| 1118410.0|   mex| col|
|   55302.0|   mex| col|
|  885914.0|   mex| col|
+----------+------+----+
only showing top 10 rows



In [19]:
# Para borrar tablas 
spark.catalog.dropGlobalTempView("colombia_imports_origin_mexico_global_tmp_view")
spark.catalog.dropTempView("colombia_imports_origin_usa_tmp_view")


### Viewing the Metadata<a name="id2.3"></a>

In [20]:
spark.catalog.listDatabases()


[Database(name='default', description='default database', locationUri='file:/home/jovyan/work/spark_session_04/spark-warehouse'),
 Database(name='learn_spark_db', description='', locationUri='file:/home/jovyan/work/spark_session_04/spark-warehouse/learn_spark_db.db')]

In [21]:
spark.catalog.listTables()


[Table(name='colombia_imports_tbl', database='learn_spark_db', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='managed_colombia_imports_tbl', database='learn_spark_db', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='colombia_imports', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='us_delay_flights_tbl', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [22]:
spark.catalog.listColumns("colombia_imports_tbl")

[Column(name='_c0', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='id_import', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='year', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='origin', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='dest', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='sitc4', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='export_val', description=None, dataType='double', nullable=True, isPartition=False, isBucket=False)]

## Data Sources for DataFrames and SQL Tables<a name="id3"></a>


### DataFrameReader<a name="id3.1"></a>

```DataFrameReader.format(args).option("key", "value").schema(args).load()```

```SparkSession.read```

```SparkSession.readStream```

![Read Data Spark](img/Read_data_spark.png)


### DataFrameWriter <a name="id3.2"></a>


```DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)```

```DataFrame.write```

```DataFrame.writeStream```

![Write Data Spark](img/Write_data_spark.png)


### JSON <a name="id3.3"></a>



In [55]:
file = "/home/jovyan/work/Data/covid.json"

In [56]:
df = spark.read.format("json").load(file)


In [57]:
df.printSchema()



root
 |-- contributors: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: boolean (nullable = true)
 |    |    |    |    |-- title: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |

In [58]:
df.columns

['contributors',
 'coordinates',
 'created_at',
 'display_text_range',
 'entities',
 'extended_entities',
 'extended_tweet',
 'favorite_count',
 'favorited',
 'filter_level',
 'geo',
 'id',
 'id_str',
 'in_reply_to_screen_name',
 'in_reply_to_status_id',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id',
 'in_reply_to_user_id_str',
 'is_quote_status',
 'lang',
 'place',
 'possibly_sensitive',
 'quote_count',
 'quoted_status',
 'quoted_status_id',
 'quoted_status_id_str',
 'quoted_status_permalink',
 'reply_count',
 'retweet_count',
 'retweeted',
 'retweeted_status',
 'source',
 'text',
 'timestamp_ms',
 'truncated',
 'user',
 'withheld_in_countries']

In [72]:
df_users=df.select("user")

In [73]:
df_users.printSchema()

root
 |-- user: struct (nullable = true)
 |    |-- contributors_enabled: boolean (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- default_profile: boolean (nullable = true)
 |    |-- default_profile_image: boolean (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- favourites_count: long (nullable = true)
 |    |-- follow_request_sent: string (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- following: string (nullable = true)
 |    |-- friends_count: long (nullable = true)
 |    |-- geo_enabled: boolean (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- id_str: string (nullable = true)
 |    |-- is_translator: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- listed_count: long (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- notifications: string (nullable = true)
 |    |-- profile_background_color: str

In [76]:
(df_users.write.format("json")
 .mode("overwrite")
 .option("compression", "snappy")
 .save("/home/jovyan/work/Data/df_users"))


Py4JJavaError: An error occurred while calling o269.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 48.0 failed 1 times, most recent failure: Lost task 1.0 in stage 48.0 (TID 182, e767a7fe1a7c, executor driver): java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()'
	at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
	at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
	at org.apache.hadoop.io.compress.SnappyCodec.getCompressorType(SnappyCodec.java:136)
	at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
	at org.apache.hadoop.io.compress.CompressionCodec$Util.createOutputStreamWithCodecPool(CompressionCodec.java:131)
	at org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:102)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.$anonfun$createOutputStream$1(CodecStreams.scala:84)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:84)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.<init>(JsonOutputWriter.scala:47)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anon$1.newInstance(JsonFileFormat.scala:83)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:264)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
	... 32 more
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()'
	at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
	at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
	at org.apache.hadoop.io.compress.SnappyCodec.getCompressorType(SnappyCodec.java:136)
	at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
	at org.apache.hadoop.io.compress.CompressionCodec$Util.createOutputStreamWithCodecPool(CompressionCodec.java:131)
	at org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:102)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.$anonfun$createOutputStream$1(CodecStreams.scala:84)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:84)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.<init>(JsonOutputWriter.scala:47)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anon$1.newInstance(JsonFileFormat.scala:83)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:264)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [77]:
df_users.write.json("/home/jovyan/work/Data/df_users/zipcodes.json")

![Write Read Data Json](img/json_read_write.png)

### CSV <a name="id3.4"></a>

In [80]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType

In [81]:
schema_pais= StructType([StructField("Nombre_pais", StringType(),False),StructField("Abrev", StringType(),False)])

In [82]:
df_pais=(spark.read.format("csv")
         .option("header","true")
         .schema(schema_pais)
         .option("nullValue", "") 
         .load("/home/jovyan/work/Data/country-list.csv"))

In [83]:
df_pais.show(10)

+-------------------+-----+
|        Nombre_pais|Abrev|
+-------------------+-----+
|            Albania|  ALB|
|            Algeria|  ALG|
|     American Samoa|  ASA|
|            Andorra|  AND|
|             Angola|  ANG|
|           Anguilla|  AIA|
|Antigua and Barbuda|  ATG|
|          Argentina|  ARG|
|            Armenia|  ARM|
|              Aruba|  ARU|
+-------------------+-----+
only showing top 10 rows



In [85]:
df_pais.write.format("csv").mode("overwrite").save("/home/jovyan/work/Data/df_csv")

![Write Read Data Json](img/csv_read_write.png)

### Images <a name="id3.5"></a>

In [86]:
from pyspark.ml import image


In [87]:
image_dir = "/home/jovyan/work/Data/img"
images_df = spark.read.format("image").load(image_dir)
images_df.printSchema()


root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [88]:
images_df.show(10)

+--------------------+
|               image|
+--------------------+
|[file:///home/jov...|
|[file:///home/jov...|
|[file:///home/jov...|
+--------------------+



In [90]:
images_df.select("image.height", "image.width", "image.nChannels", "image.mode").show(5, truncate=False)


+------+-----+---------+----+
|height|width|nChannels|mode|
+------+-----+---------+----+
|187   |270  |3        |16  |
|168   |300  |3        |16  |
|176   |287  |3        |16  |
+------+-----+---------+----+

