## Import

In [590]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, col, isnull
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql.functions import regexp_replace, monotonically_increasing_id
from pyspark.sql.functions import trim

from pyspark import SparkConf
from pyspark import SparkContext

#### Dataframe 
Native Spark: if you’re using Spark data frames and libraries (e.g. MLlib), then your code we’ll be parallelized and distributed natively by Spark.

In [591]:
spark = SparkSession \
    .builder \
    .appName("Challenge Data Enginnering") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [592]:
# spark is an existing SparkSession
df = spark.read.text("/home/campos/projects/challenge-data-engineer/dataset/")

In [593]:
df.show(n=10, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------
 value | Project Gutenberg's Etext of Shakespeare's First Folio/35 Plays  
-RECORD 1-----------------------------------------------------------------
 value | This is our 3rd edition of most of these plays.  See the index.  
-RECORD 2-----------------------------------------------------------------
 value |                                                                  
-RECORD 3-----------------------------------------------------------------
 value |                                                                  
-RECORD 4-----------------------------------------------------------------
 value | Copyright laws are changing all over the world, be sure to check 
-RECORD 5-----------------------------------------------------------------
 value | the copyright laws for your country before posting these files!! 
-RECORD 6-----------------------------------------------------------------
 value |                 

In [594]:
print((df.count(), len(df.columns)))

(659482, 1)


In [595]:
print(df.columns)

['value']


In [596]:
type(df)

pyspark.sql.dataframe.DataFrame

In [597]:
df.collect()[10]

Row(value='')

In [598]:
print(df.explain())

== Physical Plan ==
*(1) FileScan text [value#1222] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/home/campos/projects/challenge-data-engineer/dataset], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
None


---

## Lower case

In [599]:
df = df.withColumn('value', lower(col('value')))

In [600]:
df.show(n=15, truncate=False)

+----------------------------------------------------------------+
|value                                                           |
+----------------------------------------------------------------+
|project gutenberg's etext of shakespeare's first folio/35 plays |
|this is our 3rd edition of most of these plays.  see the index. |
|                                                                |
|                                                                |
|copyright laws are changing all over the world, be sure to check|
|the copyright laws for your country before posting these files!!|
|                                                                |
|please take a look at the important information in this header. |
|we encourage you to keep this file on your own disk, keeping an |
|electronic path open for the next readers.  do not remove this. |
|                                                                |
|                                                             

---

## Remove Special Caracters

In [601]:
df = df.withColumn('value', regexp_replace(str=col('value'),
                                           pattern="[^a-z ]",
                                           replacement=""))

df.collect()[14]

Row(value='etexts readable by both humans and by computers since ')

In [602]:
(type(df))

pyspark.sql.dataframe.DataFrame

---

## Remove whitespace

In [603]:
df = df.withColumn('value', trim(col('value')))

df.collect()[14]

Row(value='etexts readable by both humans and by computers since')

---

## Remove Missing Values

In [604]:
df = df.filter(df['value'] != "")
df.show(n=5, truncate=False)

+---------------------------------------------------------------+
|value                                                          |
+---------------------------------------------------------------+
|project gutenbergs etext of shakespeares first folio plays     |
|this is our rd edition of most of these plays  see the index   |
|copyright laws are changing all over the world be sure to check|
|the copyright laws for your country before posting these files |
|please take a look at the important information in this header |
+---------------------------------------------------------------+
only showing top 5 rows



In [605]:
# from pyspark.sql.functions import col, explode, regexp_replace, split

In [606]:
# word_counts = df.rdd.flatMap(lambda word: word.split(' ')).map(lambda word: (word, 1))
# type(word_counts)

# counts = df.rdd.flatMap(lambda line: line.split(" ")) \
#              .map(lambda word: (word, 1)) \
#              .reduceByKey(lambda a, b: a + b)

In [607]:
df = df.withColumn('value', split('value', " "))
df.show(n=15, truncate=False)

+----------------------------------------------------------------------------+
|value                                                                       |
+----------------------------------------------------------------------------+
|[project, gutenbergs, etext, of, shakespeares, first, folio, plays]         |
|[this, is, our, rd, edition, of, most, of, these, plays, , see, the, index] |
|[copyright, laws, are, changing, all, over, the, world, be, sure, to, check]|
|[the, copyright, laws, for, your, country, before, posting, these, files]   |
|[please, take, a, look, at, the, important, information, in, this, header]  |
|[we, encourage, you, to, keep, this, file, on, your, own, disk, keeping, an]|
|[electronic, path, open, for, the, next, readers, , do, not, remove, this]  |
|[welcome, to, the, world, of, free, plain, vanilla, electronic, texts]      |
|[etexts, readable, by, both, humans, and, by, computers, since]             |
|[these, etexts, prepared, by, hundreds, of, volunte

In [608]:
df.collect()[14]

Row(value=['july', '', '', 'etext'])

## Expand List to Rows

In [609]:
# Returns a new row for each element in the given array or map.
# explode()

df_word = df.select(explode(df['value']))

In [610]:
type(df_word)

pyspark.sql.dataframe.DataFrame

In [611]:
df_word.collect()[0]

Row(col='project')

---

## Drop Duplicate

In [612]:
print((df_word.count(), len(df_word.columns)))

(5685857, 1)


In [613]:
df_word = df_word.sort(df_word.columns)

In [614]:
df_word = df_word.drop_duplicates()

In [615]:
type(df_word)

pyspark.sql.dataframe.DataFrame

In [616]:
print((df_word.count(), len(df_word.columns)))

(117239, 1)


In [617]:
df_word.show(n=10)

+------------+
|         col|
+------------+
|            |
|           a|
|          aa|
|        aaah|
|       aabba|
|      aachen|
|         aah|
|      aahmes|
|aakheperkara|
|       aakku|
+------------+
only showing top 10 rows



In [618]:
type(df_word)

pyspark.sql.dataframe.DataFrame

In [619]:
# df_word = df_word.rdd.zipWithIndex()
# df_word = df_word.toDF()

In [620]:
df_word.count()

117239

In [621]:
list_word = list(df_word)
list_word

[Column<b'col'>]

In [622]:
dict_word = {}
list_word = list(df_word)

for row in range(df_word.count()):
    dict_word[row] = df.collect()[row]

Py4JJavaError: An error occurred while calling o1572.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4569.0 failed 1 times, most recent failure: Lost task 4.0 in stage 4569.0 (TID 40665, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
	at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
print(dict_word)

Por que modelei as funcoes desta forma?
Quebrei o principio de responsabilidade única?

Caso fizesse uma função chamando cada método isso não faria muito sentido
Desta forma eu trabalho mais um nível de engenharia dos métodos de um dataframe.

In [1]:
Por que salvei os dict em parquet?



Object `parquet` not found.
