In [5]:
spark

In [6]:
import pyspark.sql.types as st
from pyspark.sql import udf, Window, SparkSession, Row
from pyspark.sql import functions as f

spark = SparkSession.builder.appName('OPI_Exam').getOrCreate()

In [7]:
#!unzip profeco.pdf.zip

Archive:  profeco.pdf.zip
replace all_data.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [58]:
# Add a custom Schema

precios_schema = st.StructType([
    st.StructField('producto', st.StringType(), True),
    st.StructField('presentacion', st.StringType(), True),
    st.StructField('marca', st.StringType(), True),
    st.StructField('categoria', st.StringType(), True),
    st.StructField('catalogo', st.StringType(), True),
    st.StructField('precio', st.DecimalType(), True),
    st.StructField('fechaRegistro', st.TimestampType(), True),
    st.StructField('cadenaComercial', st.StringType(), True),
    st.StructField('giro', st.StringType(), True),
    st.StructField('nombreComercial', st.StringType(), True),
    st.StructField('direccion', st.StringType(), True),
    st.StructField('estado', st.StringType(), True),
    st.StructField('municipio', st.StringType(), True),
    st.StructField('latitud', st.StringType(), True),
    st.StructField('longitud', st.StringType(), True),
    
])

In [59]:
# Read raw data

df = spark.read.schema(precios_schema).options(header='true').csv('all_data.csv')
df.show(2)

+--------------------+--------------------+--------+----------------+----------------+------+-------------------+------------------+----------+--------------------+--------------------+----------------+--------------------+--------+----------+
|            producto|        presentacion|   marca|       categoria|        catalogo|precio|      fechaRegistro|   cadenaComercial|      giro|     nombreComercial|           direccion|          estado|           municipio| latitud|  longitud|
+--------------------+--------------------+--------+----------------+----------------+------+-------------------+------------------+----------+--------------------+--------------------+----------------+--------------------+--------+----------+
|CUADERNO FORMA IT...|96 HOJAS PASTA DU...|ESTRELLA|MATERIAL ESCOLAR|UTILES ESCOLARES|    26|2011-05-18 00:00:00|ABASTECEDORA LUMEN|PAPELERIAS|ABASTECEDORA LUME...|CANNES No. 6 ESQ....|DISTRITO FEDERAL|TLALPAN          ...|19.29699|-99.125417|
|            CRAYONES|CA

In [None]:
# I want to know the registered years

# 1.- Data pre-processing

## a. How many entries are?

In [9]:
print(f'There are {df.count()} entries in this file')

There are 62530715 entries in this file


## b. How many categories are?

In [10]:
categories = df.select(['categoria']).distinct()

In [11]:
print(f'There are {categories.count()} categories')

There are 42 categories


## c. How many commercial chains are being monitored?

In [12]:
commercial_chain = df.select(['cadenaComercial']).distinct()

In [13]:
print(f'There are {commercial_chain.count()} commercial chains')

There are 706 commercial chains


## d. How could you ensure the data quality? Did you detect any inconsistence in the data or error in the source?

A: The easiest way is to ensure you are downloading it from a reliable source, in this case, the data had a gold seal that stated it's quality. 

Then we can perform different things to ensure its quality, for example look if there are missing values (empty cells) or outliers in the data. 

A thing that I have seen in this exercise and the previous one is that accents from spanish can cause troubles when are casted into strings due to the different codification they use.

Another thing I have encountered is the correct classification of data, for example in the estado column there are three entries that does not correspond to a valid mexican state. 

## e. Which products are monitored the most per state?

In [14]:
chains_products_df = df.select(['estado', 'producto','presentacion','marca']).distinct().groupby(
    ['estado','producto']).count().sort(f.col('estado').asc(), f.col('count').desc()).dropna()


In [15]:
# Filter COL. EDUARDO GUERRA, estado and from estado column
not_a_state = ['COL. EDUARDO GUERRA', 'estado', 'ESQ. SUR 125"']
# Ensure that varios is not a product
chains_products_df = chains_products_df.where(chains_products_df['producto']!='VARIOS')\
.where(chains_products_df['estado'] != 'COL. EDUARDO GUERRA')\
.where(chains_products_df['estado'] != 'estado')\
.where(chains_products_df['estado'] != 'ESQ. SUR 125"') #this could be probably done in a more efficient way using a udf, I am not sure why this is not filtering correctly
# Ensure that estados is a real mexican state
# Filter COL. EDUARDO GUERRA, estado and ESQ. SUR 125" from estado column

In [16]:
chains_products_df.select('estado').distinct().show(35)


+--------------------+
|              estado|
+--------------------+
|        QUINTANA ROO|
|          NUEVO LEÓN|
|             TABASCO|
|             SINALOA|
|     BAJA CALIFORNIA|
|            TLAXCALA|
|COAHUILA DE ZARAGOZA|
|       ESQ. SUR 125"|
| COL. EDUARDO GUERRA|
|             CHIAPAS|
|VERACRUZ DE IGNAC...|
|              SONORA|
|             YUCATÁN|
| MICHOACÁN DE OCAMPO|
|             DURANGO|
|            GUERRERO|
|             NAYARIT|
|           CHIHUAHUA|
|    DISTRITO FEDERAL|
|             HIDALGO|
|           ZACATECAS|
|          GUANAJUATO|
|          TAMAULIPAS|
|     SAN LUIS POTOSÍ|
|             MORELOS|
|      AGUASCALIENTES|
|              OAXACA|
|              PUEBLA|
| BAJA CALIFORNIA SUR|
|             JALISCO|
|            CAMPECHE|
|           QUERÉTARO|
|              COLIMA|
|              MÉXICO|
+--------------------+



In [17]:
partition = Window.partitionBy("estado").orderBy(f.col('count').desc())

In [18]:
chains_products_df.withColumn('rn', f.row_number().over(partition)).where(f.col('rn') == 1).drop('rn').show()

+--------------------+----------------+-----+
|              estado|        producto|count|
+--------------------+----------------+-----+
|        QUINTANA ROO|         MUÑECAS|  259|
|          NUEVO LEÓN|         MUÑECAS|  330|
|             SINALOA|         MUÑECAS|  309|
|             TABASCO|         MUÑECAS|  320|
|     BAJA CALIFORNIA|       PANTALLAS|  124|
|            TLAXCALA|         MUÑECAS|  272|
|COAHUILA DE ZARAGOZA|         MUÑECAS|  221|
|       ESQ. SUR 125"|TORTILLA DE MAIZ|    1|
| COL. EDUARDO GUERRA|         MUÑECAS|   19|
|             CHIAPAS|       PANTALLAS|  122|
|VERACRUZ DE IGNAC...|         MUÑECAS|  170|
|              SONORA|         MUÑECAS|  292|
|             YUCATÁN|         MUÑECAS|  324|
| MICHOACÁN DE OCAMPO|         MUÑECAS|  323|
|             DURANGO|       PANTALLAS|  165|
|            GUERRERO|       PANTALLAS|  116|
|             NAYARIT|       PANTALLAS|  117|
|           CHIHUAHUA|       PANTALLAS|  136|
|    DISTRITO FEDERAL|         MUÑ

## f. Which commercial chain has the most variety of monitored products?

In [19]:
# As the data is reported daily, most of the products will repeat many times, that's why we need to take
# into account "producto", "presentacion", "marca" and then apply the distinct() method that creates a mathematical
# set where there's only one product per entry
chains_products_df = df.select(['cadenaComercial', 'producto','presentacion','marca']).distinct().groupby(
    ['cadenaComercial','producto','presentacion','marca']).count()
chains_products_df.select('cadenaComercial', 'count').groupby('cadenaComercial').sum().orderBy('sum(count)', ascending=False).show()

+--------------------+----------+
|     cadenaComercial|sum(count)|
+--------------------+----------+
|            WAL-MART|      7371|
|MEGA COMERCIAL ME...|      7072|
|             SORIANA|      6965|
|            CHEDRAUI|      6864|
|  COMERCIAL MEXICANA|      6645|
|      BODEGA AURRERA|      6202|
|        SORIANA PLUS|      5560|
|HIPERMERCADO SORIANA|      5245|
|BODEGA COMERCIAL ...|      5212|
|     MERCADO SORIANA|      5140|
|           LIVERPOOL|      4584|
|              H.E.B.|      4480|
|  LEY (AUTOSERVICIO)|      4117|
|SEARS ROEBUCK DE ...|      4078|
|       SORIANA SUPER|      3999|
|MUEBLERIA FABRICA...|      3934|
|            CASA LEY|      3877|
|   PALACIO DE HIERRO|      3166|
|              COPPEL|      2752|
|            SUPERAMA|      2557|
+--------------------+----------+
only showing top 20 rows



Wal-Mart is the chain with the most monitored products

# 2.- Exploratory analysis

## a) Generate a basic basket that allows you to compare prices geographically and temporarily. Justify your election and procedure.

In [20]:
# first we need to choose the products that will put together our basic basket.
# Then we will choose the prices based on the day and state and average throughout commercial chains


# 3.- Visualization

## a) Create a map that allow us to identify the different offer in categories in Leon, Guanajuato and the price level in each one. Bonus points if the map is interactive.

In [21]:
zm_guanajuato=[ 'LEON', 'SILAO DE LA VICTORIA', 'SAN FRANCISCO DEL RINCON', 'PURISIMA DEL RINCON']

In [45]:
df.select(f.max('fechaRegistro')).show()
# The max registered date is:
#+-------------------+
#| max(fechaRegistro)|
#+-------------------+
#|2016-04-29 17:47:24|
#+-------------------+

+-------------------+
| max(fechaRegistro)|
+-------------------+
|2016-04-29 17:47:24|
+-------------------+



In [60]:
#First filter by state and municipios and take prices from 2015
zm_categories_df = df.select(['estado','municipio', 'cadenaComercial', 'categoria','precio','fechaRegistro','latitud', 'longitud'])

zm_categories_df = zm_categories_df.where(
    zm_categories_df['estado'] == 'GUANAJUATO').where(
    zm_categories_df['municipio'] == zm_guanajuato[0]).where(
    zm_categories_df['municipio'] == zm_guanajuato[1]).where(
    zm_categories_df['municipio'] == zm_guanajuato[2]).where(
    (zm_categories_df['fechaRegistro'] < '2016-01-01 00:00:00') & (zm_categories_df['fechaRegistro'] >= '2015-01-01 00:00:00'))

In [63]:
zm_categories_df.write.format('csv').option('header', True).mode('overwrite').option('sep', ',')\
    .save(f'/zm_Guanajuato_Categorias/')

Py4JJavaError: An error occurred while calling o565.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 85.0 failed 1 times, most recent failure: Lost task 0.0 in stage 85.0 (TID 4098, 192.168.0.17, executor driver): java.io.IOException: Mkdirs failed to create file:/zm_Guanajuato_Categorias/_temporary/0/_temporary/attempt_20201018151955_0085_m_000000_4098 (exists=false, cwd=file:/home/ernesto/Documents/ExamenOPICienciaDatos)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:84)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:264)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
	... 32 more
Caused by: java.io.IOException: Mkdirs failed to create file:/zm_Guanajuato_Categorias/_temporary/0/_temporary/attempt_20201018151955_0085_m_000000_4098 (exists=false, cwd=file:/home/ernesto/Documents/ExamenOPICienciaDatos)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:84)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:264)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [62]:

zm_categories_df = zm_categories_df.groupby(
    'cadenaComercial', 'categoria','latitud','longitud').mean('precio')

Py4JJavaError: An error occurred while calling o558.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Job 29 cancelled 
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1955)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2205)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
	... 32 more


In [1]:
from ipyleaflet import Map, basemaps
Map(center = (60, -2.2), zoom = 2, min_zoom = 1, max_zoom = 20, 
    basemap=basemaps.Stamen.Terrain)

Map(center=[60, -2.2], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', 'zoom_out_t…