In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession ,Row
from pyspark.streaming import StreamingContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover,RegexTokenizer,Word2Vec
from pyspark.ml.classification import LogisticRegression



In [3]:
spark = SparkSession.builder.appName('MLStreaming').getOrCreate()
sc = spark.sparkContext

In [4]:
data =spark.read.csv("hdfs:///user/edureka_448212/twitter_6q3ur.csv",header =True)
data.show(5)

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
+---+-----+--------------------+
only showing top 5 rows



In [11]:
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)



In [8]:
import pyspark.sql.types as tp
schema = tp.StructType([
    tp.StructField(name = 'id' ,dataType = tp.IntegerType(),nullable =True),
    tp.StructField(name = 'label' ,dataType = tp.IntegerType(),nullable =True),
    tp.StructField(name = 'tweet' ,dataType = tp.StringType(),nullable =True)
    
])

In [10]:
data =spark.read.csv("hdfs:///user/edureka_448212/twitter_6q3ur.csv",header =True,schema=schema)
data.show(5)

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
+---+-----+--------------------+
only showing top 5 rows



In [13]:
stage1 = RegexTokenizer(inputCol='tweet',outputCol='tokens',pattern='\\W')
stage2 = StopWordsRemover(inputCol = 'tokens',outputCol='filtered_word')
stage3 = Word2Vec(inputCol= 'filtered_word',outputCol='vector',vectorSize= 100)
model = LogisticRegression(featuresCol = 'vector',labelCol ='label')

In [15]:
# setting up pipeline
pipeline =Pipeline(stages =[stage1,stage2,stage3,model])
model = pipeline.fit(data)

In [17]:
# once the model is ready we save to hdfs
model.save("hdfs:///user/edureka_448212/mlmodel_tweeter")