In [1]:
from pyspark.sql.types import StructField, StructType, StringType, ArrayType, DoubleType, FloatType, IntegerType
from pyspark.sql import SQLContext
import pyspark_cassandra
import cassandra
from cassandra.cluster import Cluster
import hdfs

import os
import re
import pandas as pd
from datetime import datetime as dt

In [2]:
def clear(df):
    df = df.copy()
    df.content = df.content.apply(lambda r: re.sub('[^a-zA-ZáéíñóúüÁÉÍÑÓÚÜ]', ' ', r))
    df.content = df.content.apply(lambda r: re.sub('\s+', ' ', r))
    return df

def filtr(df):
    df = df.copy()
    print(df.shape)
    df['esp'] = df.content.apply(lambda r: re.sub('[^áéíñóúüÁÉÍÑÓÚÜ]', '', r))
    df = df[df.esp != '']
    df.drop('esp', axis=1, inplace=True)
    print(df.shape)
    return df

def filtr_sub_domain(d, sub_list=['gob', 'edu'], lab_list=[38, 36]):
    df = d.copy()
    for sub, lab in zip(sub_list, lab_list):
        df['filtr_{}'.format(lab)] = df.domain.apply(lambda r: '.{}.'.format(sub) in r or  r.endswith('.{}'.format(sub)))
        df.loc[df['filtr_{}'.format(lab)]==1, 'filtr_{}'.format(lab)] = lab
    df['lab'] = df[[column for column in df.columns if column.startswith('filtr_')]].sum(axis=1)
    df.drop(df[df['lab']==0].index, inplace=True)
    df.drop([column for column in df.columns if column.startswith('filtr')], axis=1, inplace=True)
    df.drop('politica', axis=1, inplace=True)
    return df

In [3]:
first_start, start = dt.now(), dt.now()
df = pd.DataFrame(columns=['domain', 'content', 'lab'])
key_word = '01_'
for file in os.listdir():
    if file.endswith('.csv') and '{}'.format(key_word) in file:
        d = pd.read_csv(file, sep='\t')
        df = pd.concat([df, d], axis=0)
df.drop_duplicates('domain', inplace=True)
print("Все файлы собраны и почищены от дубликатов за {}".format(dt.now() - start))
start = dt.now()
df = clear(df)
print("Датасет очищен за {}".format(dt.now() - start))
start = dt.now()
df = filtr(df)
print("Датасет отфильтрован от неиспанских текстов за {}".format(dt.now() - start))
start = dt.now()
df.content = df.content.apply(lambda r: r.lower())
# df.to_csv('{}_cont.csv'.format(key_word), sep='\t', index=False)
# df_mini = df.copy()
# df_mini.content = df_mini.content.apply(lambda r: ' '.join(list(r.split(' '))[:150]))
# df_mini.to_csv('{}_cont_mini.csv'.format(key_word), sep='\t', index=False)
print("Вся обработка и сохранение в два файла заняли {}".format(dt.now() - first_start))

Все файлы собраны и почищены от дубликатов за 0:00:01.251344
Датасет очищен за 0:00:21.858818
(2504, 3)
(2480, 3)
Датасет отфильтрован от неиспанских текстов за 0:00:30.308561
Вся обработка и сохранение в два файла заняли 0:00:54.545573


In [4]:
df.to_csv('content_neg_47.csv', sep='\t', index=False)

In [5]:
myshema = StructType([StructField("domain", StringType()), \
                      StructField("content", StringType()), \
                      StructField("lab", StringType())])
partitionNum = 100 # Increase this number if necessary
ds = sqlContext.createDataFrame(df, myshema).repartition(partitionNum)
name = 'content_neg_religion_2'
parquet_path = "hdfs://e1.getaura.ru/spark-common/vectors/{}.parquet".format(name)
ds.write.parquet(parquet_path, mode='overwrite')
"/spark-common/vectors/{}.parquet".format(name)

'/spark-common/vectors/content_neg_religion_2.parquet'

In [6]:
ds.show()

+--------------------+--------------------+---+
|              domain|             content|lab|
+--------------------+--------------------+---+
|           anaart.es|ana beltrán inici...| 55|
|          arigata.es|asociación de emp...| 55|
|    antoniolozano.es|antonio lozano te...| 55|
|      andresjarel.es|vídeo promocional...| 55|
|        dimepoker.cl|lo que necesitas ...| 18|
|abastecimiento.cy...|abastecimiento de...| 57|
|          acr.com.uy|a c r inmobiliari...| 57|
|     agenciaayala.es|agencia ayala com...| 57|
|alarconpropiedade...|inicio inicio qui...| 57|
|           aljuma.es|aljuma inmobiliar...| 57|
|abaspropiedades.c...|abas propiedades ...| 57|
|          activum.es|activum servicios...| 57|
|      afs-arq.com.ar|afs arquitectos i...| 57|
|           alamos.es|venta de pisos en...| 57|
|  almohadasba.com.ar|almohadas blanco ...| 57|
|valle.fciencias.u...|laboratorio de vi...| 36|
|       cicata.ipn.mx|inicio instituto ...| 36|
|              uai.cl|bienvenido a la u.