In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = "C:/Program Files/Java/jre1.8.0_291"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

In [3]:
import findspark
findspark.init()
import pyspark

In [4]:
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = (SparkSession
  .builder
  .appName("Streaming")
  .getOrCreate())

In [7]:
initDF = (spark\
  .readStream\
  .format("rate")\
  .option("rowsPerSecond", 1)\
  .load()
  )

In [8]:
print("Streaming DataFrame : " , initDF.isStreaming)

Streaming DataFrame :  True


In [9]:
resultDF = initDF\
        .withColumn("result", col("value") + lit(1))

In [10]:
query=resultDF\
  .writeStream\
  .outputMode("append")\
  .option("truncate", False)\
  .format("console")\
  .start()

In [11]:
query.awaitTermination(timeout=10)

False

In [12]:
query.stop()

In [17]:
df_socket = (spark
         .readStream.format("socket")
         .option("host", "localhost")
         .option("port", 1234)
         .load())

In [18]:
#words = lines.select(split(col("value"), "\\s").alias("word"))
words = df_socket.select(explode(split(df_socket.value, " ")).alias("word"))
counts = words.groupBy("word").count()

In [19]:
counts.printSchema()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [20]:
checkpointDir = "chkpnt_2"
streamingQuery = (counts.writeStream
                  .format("console")
                  .outputMode("complete")
                  .option("checkpointLocation", checkpointDir)
                  .start())
streamingQuery.awaitTermination(timeout=10)

False

In [21]:
# ncat -lvp 1234

In [22]:
from pyspark.sql.types import (StructType, StructField,
                               StringType, IntegerType,DoubleType)

In [23]:
# // Extract the Name of the stock from the file name.
schema = StructType([
  StructField("Date", StringType(), True),
  StructField("Open", DoubleType(), True),
  StructField("High", DoubleType(), True),
  StructField("Low", DoubleType(), True),
  StructField("Close", DoubleType(), True),
  StructField("Adjusted Close", DoubleType(), True),
  StructField("Volume", DoubleType(), True)
])


In [24]:
import os 
st='/Data/AABA_2006-01-01_to_2018-01-01.csv'
@udf()
def getFileName(str):
    all_name=str.split('/')[-1]
    ticker = all_name.split('_')[0]
    return ticker

In [25]:
df = spark.readStream.format("csv")\
    .schema(schema).option('maxFilesPerTrigger',2)\
    .load('stream_data')
#.option("path", "Data")
df_ticker = df.withColumn('Name',getFileName((input_file_name())))

In [26]:
df_ticker.isStreaming

True

In [27]:
checkpointDir = "chkpnt_3"
streamingQuery = (df_ticker.writeStream
                  .format("console")
                  .outputMode("append")
                  .option("checkpointLocation", checkpointDir)
                  .start())
streamingQuery.awaitTermination(timeout=10)

False

In [28]:
streamingQuery.stop()

In [29]:
df_agg = df_ticker.groupby(year('Date')).max('High')

In [30]:
checkpointDir = "chkpnt_4"
streamingQuery = (df_agg.writeStream
                  .format("console")
                  .outputMode("complete")
                  .option("checkpointLocation", checkpointDir)
                  .start())
streamingQuery.awaitTermination(timeout=10)

False

In [31]:
streamingQuery.stop()

In [3]:
!pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Installing collected packages: graphframes
Successfully installed graphframes-0.6


In [7]:
from pyspark.sql import SQLContext
#from pyspark.sql import SparkSession

In [8]:
from graphframes import *
#spark = SparkSession.builder.appName('graphframes').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [9]:
vert = spark.createDataFrame([('a','Alice',34),
                             ('b','bob',36),
                             ('c','Charli',30),
                             ('d','David',29),
                             ('e','Esther',32),
                             ('f','Fanny',36)],
                            ["id", "name", "age"])

In [10]:
Edg = spark.createDataFrame([('a','e','friend'),
                             ('f','b','follow'),
                             ('c','e','friend'),
                             ('a','b','friend'),
                            ('b','c','follow'),
                            ('c','b','follow'),
                            ('f','c','follow'),
                            ('e','f','follow'),
                            ('e','d','friend'),
                            ('d','a','friend'),],
                           ["src", "dst", "relationship"])

In [11]:
vert.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  a| Alice| 34|
|  b|   bob| 36|
|  c|Charli| 30|
|  d| David| 29|
|  e|Esther| 32|
|  f| Fanny| 36|
+---+------+---+



In [12]:
Edg.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  e|      friend|
|  f|  b|      follow|
|  c|  e|      friend|
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
+---+---+------------+



In [13]:
from graphframes import *

In [14]:
g = GraphFrame(vert, Edg)

In [15]:
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       2|
|  d|       1|
|  c|       2|
|  b|       3|
|  a|       1|
+---+--------+



In [16]:
g.outDegrees.show()

+---+---------+
| id|outDegree|
+---+---------+
|  f|        2|
|  e|        2|
|  d|        1|
|  c|        2|
|  b|        1|
|  a|        2|
+---+---------+



In [17]:
from pyspark.sql.functions import udf,col

In [18]:
@udf()
def src_gt_dst_func(src,dst):
    if src>=dst:
        return 'Delete'
    else:
        return 'Keep'

In [19]:
ed = g.edges

In [20]:
ed.withColumn("Status", src_gt_dst_func(col("src"),col("dst"))).show(truncate=False)

+---+---+------------+------+
|src|dst|relationship|Status|
+---+---+------------+------+
|a  |e  |friend      |Keep  |
|f  |b  |follow      |Delete|
|c  |e  |friend      |Keep  |
|a  |b  |friend      |Keep  |
|b  |c  |follow      |Keep  |
|c  |b  |follow      |Delete|
|f  |c  |follow      |Delete|
|e  |f  |follow      |Keep  |
|e  |d  |friend      |Delete|
|d  |a  |friend      |Delete|
+---+---+------------+------+



In [21]:
g1 = g.filterVertices("age>30").filterEdges("relationship='friend'")

In [22]:
g1.vertices.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  a| Alice| 34|
|  b|   bob| 36|
|  e|Esther| 32|
|  f| Fanny| 36|
+---+------+---+



In [23]:
g1.edges.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  e|      friend|
|  a|  b|      friend|
+---+---+------------+



In [24]:
spark.sparkContext.setCheckpointDir("chkpointsdir")

In [25]:
result = g.connectedComponents()

In [27]:
result.show() #connect to socket => ncat -lvp 1234

+---+------+---+------------+
| id|  name|age|   component|
+---+------+---+------------+
|  a| Alice| 34|412316860416|
|  b|   bob| 36|412316860416|
|  c|Charli| 30|412316860416|
|  d| David| 29|412316860416|
|  e|Esther| 32|412316860416|
|  f| Fanny| 36|412316860416|
+---+------+---+------------+



In [28]:
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(c)")

In [29]:
motifs.filter("a.id !=c.id ").show()

+---------------+--------------+---------------+--------------+---------------+
|              a|             e|              b|            e2|              c|
+---------------+--------------+---------------+--------------+---------------+
|[c, Charli, 30]|[c, e, friend]|[e, Esther, 32]|[e, f, follow]| [f, Fanny, 36]|
| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]| [f, Fanny, 36]|
| [f, Fanny, 36]|[f, c, follow]|[c, Charli, 30]|[c, e, friend]|[e, Esther, 32]|
|   [b, bob, 36]|[b, c, follow]|[c, Charli, 30]|[c, e, friend]|[e, Esther, 32]|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|
|[c, Charli, 30]|[c, e, friend]|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|
| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|
|[e, Esther, 32]|[e, f, follow]| [f, Fanny, 36]|[f, c, follow]|[c, Charli, 30]|
| [f, Fanny, 36]|[f, b, follow]|   [b, bob, 36]|[b, c, follow]|[c, Charli, 30]|
| [a, Alice, 34]|[a, b, friend]|   [b, b