In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [5]:
spark.sparkContext

In [5]:
book = spark.read.text("1342-0.txt")
book

DataFrame[value: string]

In [7]:
book.printSchema() #schema of the book

root
 |-- value: string (nullable = true)



In [8]:
spark.read?

[1;31mType:[0m        property
[1;31mString form:[0m <property object at 0x0000026CFE7273D0>
[1;31mDocstring:[0m  
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.

.. versionadded:: 2.0.0

Returns
-------
:class:`DataFrameReader`


In [9]:
book.show() #show book

+--------------------+
|               value|
+--------------------+
|The Project Guten...|
|                    |
|This eBook is for...|
|almost no restric...|
|re-use it under t...|
|with this eBook o...|
|                    |
|                    |
|Title: Pride and ...|
|                    |
| Author: Jane Austen|
|                    |
|Posting Date: Aug...|
|Release Date: Jun...|
|Last Updated: Mar...|
|                    |
|   Language: English|
|                    |
|Character set enc...|
|                    |
+--------------------+
only showing top 20 rows



In [10]:
book.show(10, truncate=50)#show book wider with fewer lines

+--------------------------------------------------+
|                                             value|
+--------------------------------------------------+
|The Project Gutenberg EBook of Pride and Prejud...|
|                                                  |
|This eBook is for the use of anyone anywhere at...|
|almost no restrictions whatsoever.  You may cop...|
|re-use it under the terms of the Project Gutenb...|
|    with this eBook or online at www.gutenberg.org|
|                                                  |
|                                                  |
|                        Title: Pride and Prejudice|
|                                                  |
+--------------------------------------------------+
only showing top 10 rows



In [11]:
from pyspark.sql.functions import split
lines = book.select(split(book.value, " ").alias("line"))
lines.show(5) #show book line by line

+--------------------+
|                line|
+--------------------+
|[The, Project, Gu...|
|                  []|
|[This, eBook, is,...|
|[almost, no, rest...|
|[re-use, it, unde...|
+--------------------+
only showing top 5 rows



In [12]:
book.select(book.value)

DataFrame[value: string]

In [14]:
from pyspark.sql.functions import col
book.select(book.value)


DataFrame[value: string]

In [16]:
book.select(book["value"])


DataFrame[value: string]

In [17]:
book.select(col("value"))

DataFrame[value: string]

In [19]:
from pyspark.sql.functions import col, split
lines = book.select(split(col("value"), " "))
lines

DataFrame[split(value,  , -1): array<string>]

In [21]:
book.select(split(col("value"), " ")).printSchema()

root
 |-- split(value,  , -1): array (nullable = true)
 |    |-- element: string (containsNull = true)



In [23]:
lines = book.select(split(book.value, " ").alias("line"))
lines

DataFrame[line: array<string>]

In [25]:
from pyspark.sql.functions import explode, col
words = lines.select(explode(col("line")).alias("word"))
words.show(15) #show a word for every row

+----------+
|      word|
+----------+
|       The|
|   Project|
| Gutenberg|
|     EBook|
|        of|
|     Pride|
|       and|
|Prejudice,|
|        by|
|      Jane|
|    Austen|
|          |
|      This|
|     eBook|
|        is|
+----------+
only showing top 15 rows



In [26]:
from pyspark.sql.functions import lower
words_lower = words.select(lower(col("word")).alias("word_lower"))
words_lower.show()#show lowercase word for every row

+----------+
|word_lower|
+----------+
|       the|
|   project|
| gutenberg|
|     ebook|
|        of|
|     pride|
|       and|
|prejudice,|
|        by|
|      jane|
|    austen|
|          |
|      this|
|     ebook|
|        is|
|       for|
|       the|
|       use|
|        of|
|    anyone|
+----------+
only showing top 20 rows



In [27]:
from pyspark.sql.functions import regexp_extract
words_clean = words_lower.select(    regexp_extract(col("word_lower"), "[a-z]*", 0).alias("word")  )
words_clean.show() #show lowercase word for every row with all regular expressions gone

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|         |
|     this|
|    ebook|
|       is|
|      for|
|      the|
|      use|
|       of|
|   anyone|
+---------+
only showing top 20 rows



In [29]:
words_nonull = words_clean.where(col("word") != "")
words_nonull.show() #remove null rows

+---------+
|     word|
+---------+
|      the|
|  project|
|gutenberg|
|    ebook|
|       of|
|    pride|
|      and|
|prejudice|
|       by|
|     jane|
|   austen|
|     this|
|    ebook|
|       is|
|      for|
|      the|
|      use|
|       of|
|   anyone|
| anywhere|
+---------+
only showing top 20 rows



In [31]:
groups = words_nonull.groupby(col("word"))
results = words_nonull.groupby(col("word")).count()
results.show()#show word count

+-------------+-----+
|         word|count|
+-------------+-----+
|       online|    4|
|         some|  203|
|        still|   72|
|          few|   72|
|         hope|  122|
|        those|   60|
|     cautious|    4|
|    imitation|    1|
|          art|    3|
|      solaced|    1|
|       poetry|    2|
|    arguments|    5|
| premeditated|    1|
|      elevate|    1|
|       doubts|    2|
|    destitute|    1|
|    solemnity|    5|
|   lieutenant|    1|
|gratification|    1|
|    connected|   14|
+-------------+-----+
only showing top 20 rows



In [34]:
results.orderBy("count", ascending=False).show(10) #sort by word count

+----+-----+
|word|count|
+----+-----+
| the| 4480|
|  to| 4218|
|  of| 3711|
| and| 3504|
| her| 2199|
|   a| 1982|
|  in| 1909|
| was| 1838|
|   i| 1750|
| she| 1668|
+----+-----+
only showing top 10 rows



In [35]:
results.coalesce(1).write.csv("./results_single_partition.csv") #put all word count in a csv

In [38]:
import pyspark.sql.functions as F #put everything in one place
results = (    
    spark.read.text("1342-0.txt")    
    .select(F.split(F.col("value"), " ").alias("line"))    
    .select(F.explode(F.col("line")).alias("word"))    
    .select(F.lower(F.col("word")).alias("word"))    
    .select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))    
    .where(F.col("word") != "")    
    .groupby("word")    
    .count()
)
results.show()

+----+-----+
|word|count|
+----+-----+
| the| 4480|
|  to| 4218|
|  of| 3711|
| and| 3504|
| her| 2199|
|   a| 1982|
|  in| 1909|
| was| 1838|
|   i| 1749|
| she| 1668|
|that| 1487|
|  it| 1482|
| not| 1427|
| you| 1300|
|  he| 1296|
|  be| 1257|
| his| 1247|
|  as| 1174|
| had| 1170|
|with| 1092|
+----+-----+
only showing top 20 rows



In [40]:
from pyspark.sql import SparkSession #create own spark session
spark = (SparkSession.builder                     
         .appName("Counting word occurences from a book.")                     
         .getOrCreate())
spark

In [43]:
results = (    
    spark.read.text("*.txt")    
    .select(F.split(F.col("value"), " ").alias("line"))    
    .select(F.explode(F.col("line")).alias("word"))    
    .select(F.lower(F.col("word")).alias("word"))    
    .select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))    
    .where(F.col("word") != "")    
    .groupby("word")    
    .count()
)
results.orderBy("count", ascending=False).show()# for all txt files

+----+-----+
|word|count|
+----+-----+
| the|38895|
| and|23919|
|  of|21199|
|  to|20526|
|   a|14464|
|   i|13973|
|  in|12777|
|that| 9623|
|  it| 9099|
| was| 8920|
| her| 7923|
|  my| 7385|
| his| 6642|
|with| 6575|
|  he| 6444|
|  as| 6439|
| you| 6295|
| had| 5718|
| she| 5617|
| for| 5425|
+----+-----+
only showing top 20 rows



In [2]:
import os
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()

In [5]:
spark.sparkContext.setLogLevel("ERROR")