* > # EE226 - Coding 1
## Wordcount in Spark

### Setup

Let's setup Spark on your Kaggle environment.  Run the cell below!

In [1]:
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.1.1.tar.gz (212.3 MB)
[K     |████████████████████████████████| 212.3 MB 14 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 40.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=6c5aff08bd9db583b1d8eb4af32b662055042b8bb09990d958f18fef1abb94f8
  Stored in directory: /root/.cache/pip/wheels/43/47/42/bc413c760cf9d3f7b46ab7cd6590e8c47ebfd19a7386cd4a57
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspar

In [2]:
# Import required packages
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd

# Create the Spark Session
spark = SparkSession.builder.getOrCreate()

# The optional settings are shown below.

# spark = SparkSession.builder\
#         .master("local[4]")\
#         .appName("pyspark_study")\
#         .config("spark.driver.memory","1g")\
#         .config("spark.executor.memory","1g")\
#         .config("spark.executor.cores","2")\
#         .config("spark.cores.max","5")\
#         .getOrCreate()

# Create the Spark Context
sc = spark.sparkContext

In [3]:
# Check the spark environment, the spark version in Kaggle is 3.1.1
spark

In [4]:
# Read in the dataset, and generate a dataframe with one column "value", where each row contains a set of words
text = spark.read.text("../input/wikipedia-sentences/wikisent2.txt")
text.show()

+--------------------+
|               value|
+--------------------+
|0.000123, which c...|
|000webhost is a f...|
|0010x0010 is a Du...|
|0-0-1-3 is an alc...|
|0.01 is the debut...|
|001 of 3 February...|
|003230 is a South...|
|0.04%Gas molecule...|
|0.04% of the vote...|
|005.1999.06 is th...|
|005 is a 1981 arc...|
|007 Legends is a ...|
|007 Legends is th...|
|007 Racing is a r...|
|00 AM PST and bur...|
|00am to 1:00pm on...|
|0.0 is a live alb...|
|0.0% is considere...|
|00 pm in daily go...|
|00 - The rebels s...|
+--------------------+
only showing top 20 rows



If you run successfully the setup stage, you are ready to work on the dataframe *text* generated from *wikisent2.txt* file which contains a copy of the sentences in Wikipedia.

### Your task

1. Write a Spark application which outputs the number of each words. In your implementation **ignore the letter case**, i.e., you need to process all words to lower case. Also, you can ignore all the words **starting** with a non-alphabetic character.

**Tips**: you'd better preprocess the dataset, i.e., remove the punctuations and numbers (0-9) to make your work effective. 

In [5]:
# change to lower case
text1=text.select(lower('value'))
text1=text1.withColumnRenamed("lower(value)",'value')
df=text1
df.printSchema()

# split each word with a space
df1=df.withColumn("value_1",explode(split('value'," ")))

# omit the punctuations immediately next to a word
df2=df1.select("value_1", f.regexp_replace(f.col("value_1"), "[,.]", "").alias("value"))
df2=df2.drop("value_1")

# omit non-alphabetic words and some unique characters
#df3=df2.filter(df2.value.contains("0")==False)
df3=df2.filter(df2.value.rlike('^[a-z]+$'))
df3.show()

root
 |-- value: string (nullable = true)

+-----------+
|      value|
+-----------+
|      which|
|corresponds|
|         to|
|          a|
|   distance|
|         of|
|        mly|
|         or|
|        mpc|
|         is|
|          a|
|       free|
|        web|
|    hosting|
|    service|
|   operated|
|         by|
|  hostinger|
|         is|
|          a|
+-----------+
only showing top 20 rows



In [6]:
# implement map and reduce
output = df3.rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
output=output.toDF()

# change the type of elements in column 1
output=output.withColumn("_1",output["_1"].cast(StringType()))
output=output.select( f.regexp_replace(f.col("_1"), "[{}]", "").alias("value"),"_2")
output=output.withColumnRenamed("_2",'count')
output.show()

+----------+------+
|     value| count|
+----------+------+
|  distance|  9272|
|        an|852941|
|         e| 15227|
|   college| 95203|
|     debut| 49805|
|     album|242591|
|    korean| 12428|
|       uhm|    18|
|      sega|  2019|
| featuring| 19318|
|    fourth| 30012|
|operations| 19499|
|      home| 61414|
|   biggest|  6470|
|     bunch|   461|
|   talents|  1009|
|      live| 46764|
|   perform|  7004|
|      name|128629|
|      they|178412|
+----------+------+
only showing top 20 rows



2. After wordcounting, you need to solve the problems below.

The use of `dataframe` is recommended, you can convert your count list to spark dataframe, and apply related functions to get the results easily.

1) Which word appears the most? Write code to output your result.

In [7]:
# cache for speed
output.cache()

# sort by descending orders and choose the top 1
sorted_output=output.sort(desc("count"))
sorted_output.show(1)

+-----+--------+
|value|   count|
+-----+--------+
|  the|10783122|
+-----+--------+
only showing top 1 row



2) How many times does the word ‘China’ appear? Write code to output your result.

In [8]:
china_appear=output.where("value=='china'")
china_appear.show()

+-----+-----+
|value|count|
+-----+-----+
|china|27632|
+-----+-----+



3) Write a block which outputs the number of words that start with each letter (a - z). This means that for every letter we want to count the total number of (non-unique) words that start with a specific letter.

In [9]:
# extract the first character of each word
extracted_letter=output.select(substring(output.value, 1,1).alias('s'),"count")

# implement map and reduce which uses the letter as the key and the count as the value
letter_startwith =extracted_letter.rdd.map(lambda x: (x[0], x[1])).reduceByKey(lambda a, b: a + b)
letter_startwith=letter_startwith.toDF()
letter_startwith=letter_startwith.withColumnRenamed("_1","letter")
letter_startwith=letter_startwith.withColumnRenamed("_2","count")
letter_startwith.sort(desc('count')).show(26)

+------+--------+
|letter|   count|
+------+--------+
|     t|18830225|
|     a|18589766|
|     i|12505513|
|     o| 9760270|
|     s| 9733911|
|     c| 8129109|
|     w| 6835629|
|     f| 6670944|
|     b| 6565492|
|     p| 6033054|
|     m| 5182604|
|     h| 4706378|
|     r| 4493378|
|     d| 4025353|
|     l| 3418787|
|     e| 3200433|
|     n| 2944063|
|     g| 2445425|
|     u| 1840368|
|     j| 1235458|
|     v| 1191023|
|     k| 1070563|
|     y|  520885|
|     q|  178096|
|     z|  145912|
|     x|   44150|
+------+--------+

