In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [27]:
df = spark.createDataFrame([
    (1, 144.5, 5.9, 33, "M"),
    (2, 167.2, 5.4, 45, "M"),
    (3, 124.1, 5.2, 23, "F"),
    (4, 144.5, 5.9, 33, "M"),
    (5, 133.2, 5.7, 54, "F"),
    (3, 124.1, 5.2, 23, "F"),
    (5, 129.2, 5.3, 42, "M")
], ["id", "weight", "height", "age", "gender"])

In [28]:
print(df.count())

7


In [29]:
print(df.distinct().count())

6


In [30]:
df = df.dropDuplicates()

In [31]:
df.orderBy("id").show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [32]:
df.count()

6

In [37]:
df.select([c for c in df.columns if c != "id"]).distinct().count()

5

In [38]:
df = df.dropDuplicates([c for c in df.columns if c != "id"])

In [39]:
df.orderBy("id").show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [40]:
df.select([c for c in df.columns if c == "id"]).distinct().count()

4

In [41]:
df = df.dropDuplicates([c for c in df.columns if c == "id"])

In [42]:
df.orderBy("id").show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 133.2|   5.7| 54|     F|
+---+------+------+---+------+



In [43]:
from pyspark.sql import functions as f

In [44]:
df.agg(
    f.count("id"),
    f.countDistinct("id")
).show()

+---------+------------------+
|count(id)|count(DISTINCT id)|
+---------+------------------+
|        4|                 4|
+---------+------------------+



In [46]:
df.withColumn("nid", f.monotonically_increasing_id()).orderBy("id").show()

+---+------+------+---+------+-------------+
| id|weight|height|age|gender|          nid|
+---+------+------+---+------+-------------+
|  1| 144.5|   5.9| 33|     M| 592705486848|
|  2| 167.2|   5.4| 45|     M|1099511627776|
|  3| 124.1|   5.2| 23|     F| 919123001344|
|  5| 133.2|   5.7| 54|     F| 584115552256|
+---+------+------+---+------+-------------+



In [48]:
df.withColumn("id", f.monotonically_increasing_id()).orderBy("id").show()

+-------------+------+------+---+------+
|           id|weight|height|age|gender|
+-------------+------+------+---+------+
| 584115552256| 133.2|   5.7| 54|     F|
| 592705486848| 144.5|   5.9| 33|     M|
| 919123001344| 124.1|   5.2| 23|     F|
|1099511627776| 167.2|   5.4| 45|     M|
+-------------+------+------+---+------+



In [72]:
df = spark.createDataFrame(
[
    (1, 143.5, 5.6, 28, "M", 100000),
    (2, 167.2, 5.4, 45, "M", None),
    (3, None, 5.2, None, None, None),
    (4, 144.5, 5.9, 33, "M", None),
    (5, 133.2, 5.7, 54, "F", None),
    (6, 124.1, 5.2, None, "F", None),
    (7, 129.2, 5.3, 42, "M", 76000)
],
["id", "weight", "height", "age", "gender", "income"]
)

In [50]:
df.agg(f.count("weight")/f.count("*")).show()

+--------------------------+
|(count(weight) / count(1))|
+--------------------------+
|        0.8571428571428571|
+--------------------------+



In [111]:
df.agg(
    *[(1-(f.count(c)/f.count("*"))).alias(c+"Rate") for c in df.columns]
).show()

+------+----------+----------+-------+
|idRate|weightRate|heightRate|ageRate|
+------+----------+----------+-------+
|   0.0|       0.0|       0.0|    0.0|
+------+----------+----------+-------+



In [56]:
df = df.select([c for c in df.columns if c != "income"])

In [73]:
df.show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  1| 143.5|   5.6|  28|     M|100000|
|  2| 167.2|   5.4|  45|     M|  null|
|  3|  null|   5.2|null|  null|  null|
|  4| 144.5|   5.9|  33|     M|  null|
|  5| 133.2|   5.7|  54|     F|  null|
|  6| 124.1|   5.2|null|     F|  null|
|  7| 129.2|   5.3|  42|     M| 76000|
+---+------+------+----+------+------+



In [58]:
df.dropna(thresh=3).show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [82]:
means = df.agg(
    *[f.mean(c).alias(c) for c in df.columns if c != "gender"]
)

In [83]:
means.show()

+---+------------------+-----------------+----+-------+
| id|            weight|           height| age| income|
+---+------------------+-----------------+----+-------+
|4.0|140.28333333333333|5.471428571428571|40.4|88000.0|
+---+------------------+-----------------+----+-------+



In [84]:
meanPandas = means.toPandas()

In [78]:
meanPandas = meanPandas.to_dict()

In [79]:
meanPandas

{'avg(id)': {0: 4.0},
 'avg(weight)': {0: 140.28333333333333},
 'avg(height)': {0: 5.471428571428571},
 'avg(age)': {0: 40.4},
 'avg(income)': {0: 88000.0}}

In [88]:
meanPandas = means.toPandas()

In [89]:
meanPandas = meanPandas.to_dict("records")[0]

In [90]:
meanPandas

{'id': 4.0,
 'weight': 140.28333333333333,
 'height': 5.471428571428571,
 'age': 40.4,
 'income': 88000.0}

In [95]:
df.show()

+---+------------------+------+---+------+------+
| id|            weight|height|age|gender|income|
+---+------------------+------+---+------+------+
|  1|             143.5|   5.6| 28|     M|100000|
|  2|             167.2|   5.4| 45|     M| 88000|
|  3|140.28333333333333|   5.2| 40|  null| 88000|
|  4|             144.5|   5.9| 33|     M| 88000|
|  5|             133.2|   5.7| 54|     F| 88000|
|  6|             124.1|   5.2| 40|     F| 88000|
|  7|             129.2|   5.3| 42|     M| 76000|
+---+------------------+------+---+------+------+



In [94]:
df = df.fillna(meanPandas)

In [96]:
df = spark.createDataFrame(
    [
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42),
    ],
    ['id', 'weight', 'height', 'age']
)

In [98]:
df.approxQuantile("weight", [0.25, 0.75], 0.05)

[129.2, 154.2]

In [99]:
quantiles = df.approxQuantile("weight", [0.25, 0.75], 0.05)

In [100]:
IQR = quantiles[1] - quantiles[0]

In [101]:
bounds = [quantiles[0]-1.5*IQR, quantiles[1]+1.5*IQR]

In [102]:
bounds

[91.69999999999999, 191.7]

In [103]:
cols = ["weight", "height", "age"]

In [104]:
bounds = {}

for c in cols:
    quantiles = df.approxQuantile(c, [0.25, 0.75], 0.05)
    IQR = quantiles[1]-quantiles[0]
    bounds[c] = [quantiles[0] - 1.5*IQR, quantiles[1]+1.5*IQR]

In [105]:
bounds

{'weight': [91.69999999999999, 191.7],
 'height': [4.499999999999999, 6.1000000000000005],
 'age': [-11.0, 93.0]}

In [107]:
df.select(df["weight"] < bounds["weight"][0]).show()
df.select(df["weight"] > bounds["weight"][1]).show()

+----------------------------+
|(weight < 91.69999999999999)|
+----------------------------+
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
+----------------------------+

+----------------+
|(weight > 191.7)|
+----------------+
|           false|
|           false|
|            true|
|           false|
|           false|
|           false|
|           false|
+----------------+



In [119]:
outlier = df.select(
    *["id"]+
    [((df[c] < bounds[c][0]) | (df[c] > bounds[c][1])).alias(c+"_O") for c in cols]
)

In [120]:
outlier.show()

+---+--------+--------+-----+
| id|weight_O|height_O|age_O|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+



In [121]:
join = df.join(outlier, on="id")

In [122]:
join.show()

+---+------+------+---+--------+--------+-----+
| id|weight|height|age|weight_O|height_O|age_O|
+---+------+------+---+--------+--------+-----+
|  7| 129.2|   5.3| 42|   false|   false|false|
|  6| 124.1|   5.1| 21|   false|   false|false|
|  5| 133.2|   5.4| 54|   false|   false|false|
|  1| 143.5|   5.3| 28|   false|   false|false|
|  3| 342.3|   5.1| 99|    true|   false| true|
|  2| 154.2|   5.5| 45|   false|   false|false|
|  4| 144.5|   5.5| 33|   false|   false|false|
+---+------+------+---+--------+--------+-----+



In [124]:
join.filter("!weight_O").select("id", "weight").show()

+---+------+
| id|weight|
+---+------+
|  7| 129.2|
|  6| 124.1|
|  5| 133.2|
|  1| 143.5|
|  2| 154.2|
|  4| 144.5|
+---+------+



In [125]:
from pyspark.streaming import StreamingContext

In [126]:
from pyspark import SparkContext

In [129]:
sc = SparkContext.getOrCreate()
streaming = StreamingContext(sc, 5)

In [130]:
streaming

<pyspark.streaming.context.StreamingContext at 0x1047b9160>

In [131]:
line = streaming.socketTextStream("localhost", 10080)

In [133]:
words = line.flatMap(lambda line:line.split())

In [134]:
pairs = words.map(lambda w:(w,1))

In [135]:
type(pairs)

pyspark.streaming.dstream.TransformedDStream

In [136]:
count = pairs.reduceByKey(lambda a,b:a+b)

In [137]:
count.pprint()

In [140]:
streaming.start()
streaming.awaitTermination()
#terminal nc -lk 10080

Py4JJavaError: An error occurred while calling o803.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/streaming/util.py", line 65, in call
    r = self.func(t, *rdds)
  File "/usr/local/lib/python3.7/site-packages/pyspark/streaming/dstream.py", line 171, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/usr/local/lib/python3.7/site-packages/pyspark/rdd.py", line 1358, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/local/lib/python3.7/site-packages/pyspark/context.py", line 1001, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 246.0 failed 1 times, most recent failure: Lost task 0.0 in stage 246.0 (TID 9366, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/lychan/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/Users/lychan/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/lychan/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/lychan/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/Users/lychan/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/lychan/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


In [141]:
from pyspark.sql import SparkSession

In [142]:
spark = SparkSession.builder.master("local[2]").appName("Streaming").getOrCreate()

In [143]:
spark

In [147]:
lines = spark.readStream.format("socket")\
    .option("host", "localhost")\
    .option("port", "9999")\
    .load()

In [145]:
from pyspark.sql.functions import explode, split

In [148]:
words = lines.select(
    explode(split(lines.value, " ")).alias("word")
)

In [149]:
wordCount = words.groupBy("word").count()

In [151]:
stream = wordCount.writeStream \
                                .outputMode("complete") \
                                .format("console") \
                                .start()

stream.awaitTermination()
#terminal nc -lk 9999

KeyboardInterrupt: 