In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import posexplode, explode, udf, when, regexp_replace, monotonically_increasing_id, lower
from  pyspark.sql.types import StructType, StructField, StringType, MapType
from googletrans import Translator

In [3]:
os.environ['PYSPARK_SUBMIT_ARGS'] = """
    --conf spark.app.name="teste"
    --conf spark.driver.memory=12g
    --conf spark.executor.memory=12g
    pyspark-shell
    """

In [4]:
spark = SparkSession.builder.appName("spk").getOrCreate()

22/05/19 14:35:24 WARN Utils: Your hostname, DP6-PE07RZJL resolves to a loopback address: 127.0.1.1; using 192.168.15.200 instead (on interface wifi0)
22/05/19 14:35:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/19 14:35:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [47]:
DADOS = '../dados/train/'
SAIDA = '../dados/tratados/'

In [48]:
json_schema = StructType([
  StructField('cell_type',  MapType(StringType(),StringType()), False),
  StructField('source',  MapType(StringType(),StringType()), False)
])

In [120]:
df = spark.read.option("multiline", "true").json(DADOS, schema=json_schema)


In [97]:
df = df.limit(10000)

In [121]:
df = df.withColumn('note_id', monotonically_increasing_id())

In [122]:
df_cell_type = df.select( 
    posexplode( df.cell_type ), 
    df.note_id
).withColumnRenamed('value', 'tipo')

In [123]:
df_source = df.select( 
    explode( df.source ),
    df.note_id
)

In [124]:
df_source = df_source.localCheckpoint()



In [125]:
df_source = df_source.withColumn('value',
    # regexp_replace(
        # regexp_replace(
            regexp_replace(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(df_source['value'] , r'<[^>]+>', ' '),
                    r'\!\[([\S\s]*?)\][\S\s]*?\)', '$1'),
                r"b\'[\s\S]*?\'", '< bytes >'),
            r'b\"[\s\S]*?\"', '< bytes >'),
        # r'<\s*table[\S\s]*?table\s*>', '< table >'),
    # r'<[\S\s]*?src\s*=[\S\s]*?>', '< image >')    
)



In [126]:
df_t = df_cell_type.join(df_source, ['key', 'note_id'], 'inner').drop('key')

In [127]:
df_t = df_t.localCheckpoint()



In [None]:
from pyspark.sql.functions import length

In [94]:
df_t.filter( (df_t.note_id == 65 ) &  (df_t.pos == 25) )\
    .withColumn('value', regexp_replace(df_source['value'] , r'<[\S\s]*?src\s*=[\S\s]*?>', '< image >'))\
    .head()


Row(note_id=65, pos=25, tipo='markdown', value='# Introduction\n* **Note: Turn on your GPU.**\n* In this kernel we will generate new abstract arts using **DCGANs**. DCGANs is a method which used for generating new image data and it is very effective. \n* Here is a example of bedrooms generated by DCGANs: < image ><br>\n         <p class="card-text">A kernel about generating clothes using traditional GANs</p>\n         <a href="https://www.kaggle.com/mrhippo/fashion-dataset-gans" class="btn btn-primary" style="color:white;">Go to Post</a>\n      </div>\n    </div>\n  </div> \n</div>\n\n\n<hr>\n\n\n## Content\n* We are using [Abstract Art Gallery](https://www.kaggle.com/bryanb/abstract-art-gallery) dataset.\n\n\n* Imports and Dataset\n* Preparing Data\n* Generator\n* Discriminator\n* DCGANs\n* Training\n* Conclusion')

In [None]:
df_t.filter( (length(df_t.value) > 5000)).count()

In [119]:
df_t.filter( (length(df_t.value) > 5000)).tail(5)



[Row(note_id=9993, pos=1, tipo='code', value='# I- converting to continuous data:\n\ndef continuous_data(columns): \n# columns = [\'pct_black/hispanic\', \'pct_free/reduced\', \'county_connections_ratio\',\n#           \'pp_total_raw\']     \n\n    d_infos_ = pd.DataFrame(d)\n\n    for column in columns :\n        datas = []\n        if column == \'pp_total_raw\':\n            for data in d[column]:\n                data = re.findall(r"[-+]?\\d*\\.\\d+|\\d+", data)\n                data[0] = int(float(data[0]))\n                data[1] = int(float(data[1]))\n                datas.append(sum(data)/len(data))\n        else :\n            for data in d[column]:\n                data = re.findall(r"[-+]?\\d*\\.\\d+|\\d+", data)\n                data[0] = float(data[0])\n                data[1] = float(data[1])\n                datas.append(sum(data)/len(data))\n        d_infos_[column] = datas\n    return d_infos_\n\n# II- Top 10 products with higher engagement_index per district:\n\ndef T

In [28]:
def traduzir(value, translator):
    if translator.detect(value).lang != 'en':
        return lower( translator.translate(value, dest='en').text )
    else:
        return lower( value )

s_traduzir = udf(lambda x: traduzir(x,  Translator()))

In [29]:
df_f = df_t.withColumn('value', when( df_t.tipo == 'markdown', s_traduzir(df_t.value)).otherwise( df_t.value ) )

In [30]:
df_t.write.mode('overwrite').parquet(SAIDA)



In [31]:
spark.stop()