In [None]:
!pip install -q kafka-python
!pip install confluent_kafka
!pip install pyspark



In [None]:
# Baixar e configurar Kafka
!wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
!tar -xzf kafka_2.12-2.8.0.tgz

# Iniciar Zookeeper
!kafka_2.12-2.8.0/bin/zookeeper-server-start.sh -daemon kafka_2.12-2.8.0/config/zookeeper.properties

# Iniciar Kafka
!kafka_2.12-2.8.0/bin/kafka-server-start.sh -daemon kafka_2.12-2.8.0/config/server.properties


--2024-08-26 20:46:12--  https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 71542357 (68M) [application/x-gzip]
Saving to: ‘kafka_2.12-2.8.0.tgz.8’


2024-08-26 20:46:15 (21.7 MB/s) - ‘kafka_2.12-2.8.0.tgz.8’ saved [71542357/71542357]



In [None]:
import time
import requests
import os
import json
from datetime import datetime

# Cria a pasta onde os arquivos serão salvos
folder_path = "dados_mercadobitcoin"
os.makedirs(folder_path, exist_ok=True)

# Define a URL da API
url = "https://www.mercadobitcoin.net/api/BTC/ticker/"


# Define o tempo total (1 minuto) e o intervalo entre as requisições
total_time = 30  # 1 minuto
interval = 5  # intervalo entre as requisições (em segundos)

start_time = time.time()

while (time.time() - start_time) < total_time:
    try:
        # Faz a requisição para a API
        response = requests.get(url)
        data = response.json()

        # Gera um timestamp para o nome do arquivo
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        file_name = f"{timestamp}.json"

        # Salva os dados em um arquivo
        file_path = os.path.join(folder_path, file_name)
        with open(file_path, 'w') as file:
            json.dump(data, file)

        print(f"Dados salvos em {file_path}")

    except Exception as e:
        print(f"Erro ao coletar dados: {e}")

    # Espera o tempo definido antes de fazer a próxima requisição
    time.sleep(interval)


Dados salvos em dados_mercadobitcoin/20240826_204623.json
Dados salvos em dados_mercadobitcoin/20240826_204628.json
Dados salvos em dados_mercadobitcoin/20240826_204633.json
Dados salvos em dados_mercadobitcoin/20240826_204638.json
Dados salvos em dados_mercadobitcoin/20240826_204643.json
Dados salvos em dados_mercadobitcoin/20240826_204648.json


In [None]:
from kafka import KafkaProducer
import json
import os

# Configurações do Kafka
kafka_bootstrap_servers = 'localhost:9092'  # Altere conforme necessário
kafka_topic = 'bitcoin_ticker'

# Inicializa o produtor Kafka
producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Caminho da pasta com os dados
folder_path = "dados_mercadobitcoin"

# Envia os arquivos JSON para o Kafka
for file_name in os.listdir(folder_path):
    if file_name.endswith('.json'):
        file_path = os.path.join(folder_path, file_name)
        with open(file_path, 'r') as file:
            data = json.load(file)
            producer.send(kafka_topic, data)
            print(f"Dados do arquivo {file_name} enviados para o Kafka.")

producer.flush()
producer.close()


Dados do arquivo 20240826_194318.json enviados para o Kafka.
Dados do arquivo 20240826_202052.json enviados para o Kafka.
Dados do arquivo 20240826_204648.json enviados para o Kafka.
Dados do arquivo 20240826_201705.json enviados para o Kafka.
Dados do arquivo 20240826_204628.json enviados para o Kafka.
Dados do arquivo 20240826_194638.json enviados para o Kafka.
Dados do arquivo 20240826_204307.json enviados para o Kafka.
Dados do arquivo 20240826_204638.json enviados para o Kafka.
Dados do arquivo 20240826_204633.json enviados para o Kafka.
Dados do arquivo 20240826_201720.json enviados para o Kafka.
Dados do arquivo 20240826_195440.json enviados para o Kafka.
Dados do arquivo 20240826_204317.json enviados para o Kafka.
Dados do arquivo 20240826_204643.json enviados para o Kafka.
Dados do arquivo 20240826_204623.json enviados para o Kafka.
Dados do arquivo 20240826_202102.json enviados para o Kafka.
Dados do arquivo 20240826_202057.json enviados para o Kafka.
Dados do arquivo 2024082

In [None]:
%%writefile consumidor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType

# Cria a sessão do Spark
spark = SparkSession.builder.appName("BitcoinTickerProcessor") .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0").getOrCreate()

# Definindo o schema para os dados JSON, com o campo "data" em vez de "date"
schema = StructType([
    StructField("ticker", StructType([
        StructField("high", StringType(), True),
        StructField("low", StringType(), True),
        StructField("vol", StringType(), True),
        StructField("last", StringType(), True),
        StructField("buy", StringType(), True),
        StructField("sell", StringType(), True),
        StructField("data", LongType(), True)  # Campo alterado para "data"
    ]))
])

# Lê os dados do Kafka
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "bitcoin_ticker").load()

# Converte os dados de JSON para colunas
df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.ticker.*")

# Configura o caminho de saída para os arquivos CSV
output_path = "/content/output_csv"

# Escreve o DataFrame processado em arquivos CSV
query = df.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", output_path) \
    .option("checkpointLocation", "checkpoint/") \
    .start()

query.awaitTermination(10)


Overwriting consumidor.py


In [None]:
!spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 consumidor.py


:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1f9f4689-189e-4b7d-aca5-f29dbd7f8c5b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.

In [None]:
# MongoDB download and installation
!wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-3.0.15.tgz  # Downloads MongoDB from official repository
!tar xfv /content/mongodb-linux-x86_64-3.0.15.tgz    # Unpack compressed file
#!rm mongodb-linux-x86_64-debian71-3.0.15.tgz          # Removes downloaded file

# Default location of database is "/data/db" folder
!mkdir /data                                          # data folder creation
!mkdir /data/db

--2024-08-26 20:47:21--  https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-3.0.15.tgz
Resolving fastdl.mongodb.org (fastdl.mongodb.org)... 18.160.18.101, 18.160.18.6, 18.160.18.8, ...
Connecting to fastdl.mongodb.org (fastdl.mongodb.org)|18.160.18.101|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 63276568 (60M) [application/x-gzip]
Saving to: ‘mongodb-linux-x86_64-3.0.15.tgz.7’


2024-08-26 20:47:21 (279 MB/s) - ‘mongodb-linux-x86_64-3.0.15.tgz.7’ saved [63276568/63276568]

mongodb-linux-x86_64-3.0.15/README
mongodb-linux-x86_64-3.0.15/THIRD-PARTY-NOTICES
mongodb-linux-x86_64-3.0.15/GNU-AGPL-3.0
mongodb-linux-x86_64-3.0.15/bin/mongodump
mongodb-linux-x86_64-3.0.15/bin/mongorestore
mongodb-linux-x86_64-3.0.15/bin/mongoexport
mongodb-linux-x86_64-3.0.15/bin/mongoimport
mongodb-linux-x86_64-3.0.15/bin/mongostat
mongodb-linux-x86_64-3.0.15/bin/mongotop
mongodb-linux-x86_64-3.0.15/bin/bsondump
mongodb-linux-x86_64-3.0.15/bin/mongofiles
mongodb-linux-x86_64-3.

In [None]:

11# Runs mongoDB server
#!mongodb-linux-x86_64-debian71-3.0.15/bin/mongod --nojournal --dbpath GDrive/My\ Drive/data/db
!nohup /content/mongodb-linux-x86_64-3.0.15/bin/mongod --nojournal --dbpath /data/db >log1 &

#import subprocess
#subprocess.Popen(["./mongodb-linux-x86_64-debian71-3.0.15/bin/mongod"," --nojournal --dbpath /data/db"])


nohup: redirecting stderr to stdout


In [None]:
!pip install pymongo==3.10.0 pandas



In [None]:
import os
import pandas as pd

# Nome das colunas
columns = ['high', 'low', 'vol', 'last', 'buy', 'sell', 'data']

# Caminho da pasta contendo os arquivos CSV
folder_path = '/content/output_csv/'

# Iterar sobre todos os arquivos na pasta
for filename in os.listdir(folder_path):
    if filename.endswith('.csv'):
        file_path = os.path.join(folder_path, filename)

        # Ler o arquivo CSV
        df = pd.read_csv(file_path, header=None)

        # Atribuir os nomes das colunas
        df.columns = columns

        # Salvar o arquivo CSV atualizado
        df.to_csv(file_path, index=False)

        print(f"Arquivo atualizado: {filename}")


Arquivo atualizado: part-00000-b2f20a26-95a8-41cd-8b7f-25e2218b8356-c000.csv


In [None]:

from pyspark.sql import SparkSession
from pymongo import MongoClient
import os
import glob

# Definir o caminho da pasta contendo arquivos CSV
csv_folder_path = '/content/output_csv/'

# Passo 3: Iniciar uma sessão PySpark
spark = SparkSession.builder \
    .appName("CSV to MongoDB") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/yourdb.yourcollection") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/yourdb.yourcollection") \
    .getOrCreate()

# Passo 4: Obter a lista de arquivos CSV na pasta
csv_files = glob.glob(os.path.join(csv_folder_path, '*.csv'))

if not csv_files:
    raise FileNotFoundError(f"Nenhum arquivo CSV encontrado na pasta: {csv_folder_path}")

# Passo 5: Conectar ao MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['yourdb']
collection = db['yourcollection']

# Função para substituir pontos nas chaves do dicionário
def replace_dot_keys(data):
    if isinstance(data, dict):
        return {k.replace('.', '_'): replace_dot_keys(v) for k, v in data.items()}
    elif isinstance(data, list):
        return [replace_dot_keys(item) for item in data]
    else:
        return data

# Processar cada arquivo CSV
for csv_file in csv_files:
    print(f"Processando arquivo: {csv_file}")

    # Ler o arquivo CSV usando PySpark
    df = spark.read.csv(csv_file, header=True, inferSchema=True)
    df.show()

    # Converter o DataFrame do Spark para uma lista de dicionários
    data = df.toPandas().to_dict(orient='records')

    # Substituir pontos nas chaves dos dicionários
    data = replace_dot_keys(data)

    # Inserir os dados no MongoDB
    collection.insert_many(data)

# Verificar os dados inseridos
inserted_data = collection.find()
for document in inserted_data:
    print(document)



Processando arquivo: /content/output_csv/part-00000-b2f20a26-95a8-41cd-8b7f-25e2218b8356-c000.csv


Py4JJavaError: An error occurred while calling o31.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (a07effacf6e3 executor driver): org.apache.spark.SparkException: Encountered error while reading file file:///content/output_csv/part-00000-b2f20a26-95a8-41cd-8b7f-25e2218b8356-c000.csv. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:864)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.ChecksumException: Checksum error: file:/content/output_csv/part-00000-b2f20a26-95a8-41cd-8b7f-25e2218b8356-c000.csv at 0 exp: -210322563 got: 1874713976
	at org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:347)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:303)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4334)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4324)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4322)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4322)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:111)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:64)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///content/output_csv/part-00000-b2f20a26-95a8-41cd-8b7f-25e2218b8356-c000.csv. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:864)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.hadoop.fs.ChecksumException: Checksum error: file:/content/output_csv/part-00000-b2f20a26-95a8-41cd-8b7f-25e2218b8356-c000.csv at 0 exp: -210322563 got: 1874713976
	at org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:347)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:303)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
	... 22 more
