In [3]:
import pandas as pd
import string
import re
import nltk
import collections

from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
import pymorphy2


stop_words = stopwords.words('russian')
morph = pymorphy2.MorphAnalyzer()

In [None]:
def preprocessing(text):
    text = str(text)
    text = re.sub(r"^\s+|\n|\r|\t|\s+$", "", text) # отступы
    text = re.sub(r'.[0-9]+', ' ', text) # цифры
    text = re.sub(r'.[a-z]+', ' ', text) # английские символы
    text = re.sub(r'[^\w\s]',' ', text) # пунктуация
    text = text.lower() # регистр
    text = " ".join([word for word in text.split(" ") if (word not in stop_words)]) # стоп-слова
    text = " ".join([t for t in text.split(" ") if len(t) > 0]) # лишние пробелы
    text = " ".join([morph.parse(word)[0].normal_form for word in text.split(" ")]) # лемматизация
    return str(text)

In [1]:
filename_source = "datapikabu_dataset_new.csv"
filename = "pikabu_dataset_longText.csv"
filename_new = "pikabu_dataset_clear.csv"

hot = "hot_dataset.csv"
hot_new = "hot_dataset_processed.csv"

In [None]:
df = pd.read_csv(filename_source)
df_enough = df[df['Text'].map(lambda x: len(str(x)) > 1000)]
df_enough.to_csv(filename, index=False)

In [None]:
df = pd.read_csv(filename)
df.index

In [None]:
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType

from functools import reduce
from pyspark.sql.functions import col

from pyspark.sql import Row
from pyspark.sql import SQLContext

In [None]:
#os.environ["PYSPARK_PYTHON"] = "python3"
sc = SparkContext(master='local[*]')

spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()

In [None]:
%%time

df_spark = spark.read.csv(filename, inferSchema=True, header=True).toDF('Number', 'FileName', 'Title', 'Link', 'ArticleId', 'Date', 'Views',
       'Author', 'Tags', 'AmountComments', 'Rating', 'Text')

prepr = F.udf(preprocessing, StringType())

df_clear = df_spark.withColumn('Text', prepr(df_spark['Text'])) \
    .withColumn('Tags', prepr(df_spark['Tags'])) \
    .withColumn('Title', prepr(df_spark['Title'])) 

In [None]:
df_clear = df_clear.toPandas()
df_clear.to_csv(filename_new)

In [None]:
df = pd.read_csv(hot)
df.head()

In [6]:
df = pd.read_csv(filename_new)
filename = "pikabu_dataset_good.csv"
df.columns

Index(['Unnamed: 0', 'Number', 'FileName', 'Title', 'Link', 'ArticleId',
       'Date', 'Views', 'Author', 'Tags', 'AmountComments', 'Rating', 'Text'],
      dtype='object')

In [9]:
df = df[['Title', 'Link', 'Date', 'Views', 'Author', 'Tags', 'AmountComments', 'Rating', 'Text']]
df.to_csv(filename, index = None)

In [10]:
df = pd.read_csv(filename)
df.head()

Unnamed: 0,Title,Link,Date,Views,Author,Tags,AmountComments,Rating,Text
0,человек,https://pikabu.ru/story/o_lyudyakh_2801350,2014-11-05,,LisSiN,игра демиург текст,1.0,8.0,демиург мазукт прийти гость свой друг демиург ...
1,один история кафе добавка,https://pikabu.ru/story/eshche_odna_moya_istor...,2014-11-05,,bambaleilo,работа случай жизнь неприятный ситуация помощь...,11.0,11.0,писать работать кассир кафе время летний каник...
2,сначала добиться,https://pikabu.ru/story/snachala_dobeysya_2801509,2014-11-05,,Kakashkolub,сперва добиться паста притча идиотизм текст,8.0,-13.0,однажды встать кровать пойти завтракать стол с...
3,счастие это просто,https://pikabu.ru/story/schaste__yeto_prosto_2...,2014-11-05,,pryasha,мой стих текст лирика,8.0,9.0,гдеть наверное дуть бризктоть резать крупно за...
4,крым наш,https://pikabu.ru/story/kryim_nash_2801415,2014-11-05,,kaprichio,украина россия мой крым текст,60.0,-145.0,момент передача крым украина крым восемь магаз...


### Прочее

In [None]:
df_spark.withColumn('Text', reduce(preprocessing, col('Text'))) \
.withColumn('Title', F.map(preprocessing, col('Title'))) \
.withColumn('Tags', F.map(preprocessing, col('Tags'))) \
.toPandas().to_csv(hot_new)

In [None]:
sc = SparkContext(master='local[*]')
sqlContext = SQLContext(sc)      

In [None]:
spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()

df_spark = spark.read.csv(hot, inferSchema=True, header=True).toDF('Number', 'FileName', 'Title', 'Link', 'ArticleId', 'Date', 'Views',
       'Author', 'Tags', 'AmountComments', 'Rating', 'Text')

def rowwise_function(row):
    row_dict = row.asDict()
    row_dict['Text'] = preprocessing(row_dict['Text'])
    row_dict['Tags'] = preprocessing(row_dict['Tags'])
    row_dict['Title'] = preprocessing(row_dict['Title'])
    newrow = Row(**row_dict)
    return newrow

ratings_rdd = df_spark.rdd
ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))
ratings_new_df = sqlContext.createDataFrame(ratings_rdd_new)
#ratings_new_df.show()

In [None]:
ratings_new_df.toPandas().to_csv(hot_new)

In [None]:
dfSchema = StructType([
    StructField('Number',IntegerType(), True),
    StructField('FileName', StringType(), True), 
    StructField('Title', StringType(),True), 
    StructField('Link', StringType(), True),
    StructField('ArticleId', IntegerType(), True), 
    StructField('Date', StringType(), True), 
    StructField('Views', IntegerType(), True),
    StructField('Author', StringType(), True), 
    StructField('Tags', StringType(), True), 
    StructField('AmountComments', IntegerType(), True), 
    StructField('Rating', IntegerType(), True), 
    StructField('Text', StringType(), True)
        ])

In [None]:
df_spark = spark.read.csv(filename, inferSchema=True, header=True).toDF('Number','FileName', 'Title', 'Link', 'ArticleId', 'Date', 'Views',
       'Author', 'Tags', 'AmountComments', 'Rating', 'Text')

def rowwise_function(row):
    row_dict = row.asDict()
    row_dict['Text'] = preprocessing(row_dict['Text'])
    row_dict['Tags'] = preprocessing(row_dict['Tags'])
    row_dict['Title'] = preprocessing(row_dict['Title'])
    newrow = Row(**row_dict)
    return newrow

In [None]:
ratings_rdd = df_spark.rdd
ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))
ratings_new_df = sqlContext.createDataFrame(ratings_rdd_new, dfSchema)
#ratings_new_df.show()

In [None]:
def toCSVLine(data):
    return ','.join(str(d) for d in data)

lines = ratings_rdd_new.map(toCSVLine)
lines.saveAsTextFile(filename_new)

In [None]:
ratings_new_df.write.format('com.databricks.spark.csv').save(filename_new)

In [None]:
ratings_new_df.write.csv(filename_new)