In [0]:
import numpy as np
data = np.random.rand(20000)
rdd = sc.parallelize(data)
print(rdd.getNumPartitions())


In [0]:
#Cela veut dire qu'on peut lancer 8 taches sur 8 noeud en parall√©les (une tache (task) par partition)

In [0]:
stop_words = ['the', 'a', 'of']
rdd2 = sc.textFile('FileStore/tables/text_gradient.txt')
rdd_filtred= rdd2.map(lambda x: x.replace(' the ', ' ')).map(lambda x: x.replace(' a ', ' ')).map(lambda x: x.replace(' of ', ' '))
rdd_filtred.take(5)


In [0]:
import numpy as np
import pandas as pd


def random_indexes(arr, rate = 0.1):
    index_of_arr = np.arange(len(arr))
    np.random.shuffle(index_of_arr)
    return index_of_arr[0:int(len(arr)*rate)]

def random_nan(arr, rate = 0.1):
    index_to_nan = random_indexes(arr, rate)
    if arr.dtype == np.int:
        arr = arr.astype('float')
    arr[index_to_nan] = np.nan
    return arr


def generate_random_string(vocab, string_length):

    return " ".join(list(np.random.choice(vocab, string_length)))


def generate_fake_email(n):
    """
    :param n: int number of email to generate
    :return: numpy array of emails
    """
    import faker
    fake = faker.Faker()
    return np.array([fake.email() for i in range(n)])


def format_column_name(col):

    return col.replace(' ', '_')


#mettre if pour is_nan => null (lowercase "nan")
def format_value(v):
    if isinstance(v, (str)):
        v = v.lower()
        if v == "nan":
            value = v.replace("nan", "NULL")
            return str(value)

        value = v.replace("'", "\\'")

        return "'" + str(value) + "'"
    if isinstance(v, np.float) and v is np.nan:
        return 'NULL'
    else:
        return str(v)

def create_table_schema(table_name, df):

    engine = create_engine('mysql+pymysql://', echo=False)

    sql = pd.io.sql.get_schema(df, table_name, con=engine).strip()
    return sql + ";"

def create_table_schemas(tables_names, dfs):

    sql = ""
    for table_name, df in zip(tables_names, dfs):
        sql += create_table_schema(table_name, df) + "\n"

    return sql

def df_to_sql_table(table_name, df):

    schema = create_table_schema(table_name, df)
    inserts = df_to_sql_insert(table_name, df)
    return "{}\n{}".format(schema, inserts)

def df_to_sql_insert(table_name, df):

    inserts = []
    for row in df.to_dict(orient='records'):
        insert = generate_insert_statement(table_name, row)
        inserts.append(insert)

    inserts_str = '\n'.join(inserts)

    return inserts_str + '\n'


def generate_insert_statement(table_name, values_as_dict):

    d = values_as_dict
    columns = ", ".join(list(d.keys()))
    values = ", ".join([format_value(v) for v in d.values()])

    s = "INSERT INTO {} ({}) VALUES ({});".format(table_name, columns, values)
    return s


def dfs_to_sql(tables_names, dfs, file_path=None):

    schemas = create_table_schemas(tables_names, dfs)

    inserts = ""
    for table_name, df in zip(tables_names, dfs):
        inserts += df_to_sql_insert(table_name, df)

    sql = "{}\n{}".format(schemas, inserts)

    if not file_path is None:
        with open(file_path, 'w') as f:
            f.write(sql)

    return sql



def df_to_sql_file(table_name, df, filepath):

    statements = df_to_sql_table(table_name, df)
    with open(filepath, 'w') as f:
        f.write(statements)


def generate_df(requirements, nrows):

    df = pd.DataFrame(index=np.arange(nrows))
    for requirement in requirements:

        colname = requirement['name']
        generator = requirement['generator']
        df[colname] = generator(nrows, df)

    return df




def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [0]:


import numpy as np

def generate_price(df, nrows):

    price = 45000 * np.log(1 + df['size'])
    price += 20000 * (df['nb_rooms'] - 1)

    price += 20000 * (df['orientation'] == 'Sud').astype('int')
    price -= 20000 * (df['orientation'] == 'Nord').astype('int')
    price += 1000 * (df['orientation'] == 'Ouest').astype('int')
    price -= 3000 * (df['orientation'] == 'Est').astype('int')

    price += 50000 * (df['garden'] == 1).astype('int')

    return price

def houses(name):

    requirements = [

        {
            'name': 'size',
            'generator': lambda nrows, df: np.abs(150 + 50 * np.random.randn(nrows))
        },
        {
            'name': 'nb_rooms',
            'generator': lambda nrows, df: np.random.randint(1, 4, nrows)
        },
        {
            'name': 'garden',
            'generator': lambda nrows, df: np.random.randint(0, 2, nrows)
        },
        {
            'name': 'orientation',
            'generator': lambda nrows, df: np.random.choice(['Sud', 'Est', 'Ouest', 'Nord'], nrows)
        },
        {
            'name': 'price',
            'generator': lambda nrows, df: generate_price(df, nrows)
        },
    ]

    df = generate_df(requirements, 100000000)


    df.to_csv(name, index=False)


def house_size_is_sold_in_six_month():
    requirements = [
        {
            'name': 'size',
            'generator': lambda nrows, df: 150 + 50 * np.random.randn(nrows),
        },
        {
            'name': 'sold_in_six_month',
            'is_target': True,
            'generator': lambda nrows, df: df['size'] > 150
        }
    ]

    df = generate_df(requirements, nrows=800000)

    df.to_csv('house_classification.csv')

houses('houses.csv')

In [0]:
df = pd.read_csv('houses.csv')
sparkDf=spark.createDataFrame(df)
sparkDf.printSchema()

In [0]:
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
def _to_java_object_rdd(rdd):  
   
    rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
    return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)

JavaObj = _to_java_object_rdd(sparkDf.rdd)

nbytes = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)
print(nbytes)

In [0]:
df_filtred = sparkDf.filter(sparkDf.nb_rooms > 2)
df_filtred.show(10)

In [0]:
from pyspark.sql import functions as F
prix_et_surface_moyenne = sparkDf.agg(F.avg('price'),F.avg('size'))
print("le prix moyen du dataset original : ",prix_et_surface_moyenne.show())


In [0]:
prix_et_surface_moyenne_filtred = df_filtred.agg(F.avg('price'),F.avg('size'))
prix_et_surface_moyenne_filtred.show()


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import stddev, mean, col
df_stats = sparkDf.select(
    mean(col('size')).alias('mean'),
    stddev(col('size')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']
sparkDf_normalized = sparkDf.withColumn('size', (F.col('size') - mean)/std)
sparkDf_normalized.show(5)
sparkDf.show(5)


In [0]:
df_train, df_test = sparkDf.randomSplit([0.8, 0.2], seed=91)

string_indexer = StringIndexer(inputCol='orientation', outputCol='orientation'+'Index')
one_hot_encoder = OneHotEncoder(inputCol=string_indexer.getOutputCol(),
                               outputCol='orientation'+'OHE')


In [0]:
label_to_index = StringIndexer(inputCol='price', outputCol='label')

In [0]:
string_model_indexer= string_indexer.fit(df_train)
display(string_model_indexer.transform(df_train))

size,nb_rooms,garden,orientation,price,orientationIndex
0.00015270783669052437,3,1,Est,87006.8713280116,2.0
0.0011854990506208,3,0,Nord,20053.315860567403,3.0
0.0016584871017357,1,1,Sud,70074.57009988211,0.0
0.0020338415652076,2,1,Sud,90091.42992492826,0.0
0.0024152917479796,1,0,Ouest,1108.5570828557711,1.0
0.0027560153426122,2,1,Nord,50123.85010231184,3.0
0.0027939365611189,1,1,Ouest,51125.55183487789,1.0
0.0038327291516679,1,0,Est,-2827.856866850937,2.0
0.0045842295157569,2,1,Sud,90205.81892722788,0.0
0.0050620955532281,3,1,Nord,70227.21968001034,3.0


In [0]:
from pyspark.ml.feature import VectorAssembler
numeric_cols = ["orientationOHE","size","nb_rooms","garden"]
assembler_inputs = numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features')


In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='label',regParam=0.3)




In [0]:
from pyspark.ml import Pipeline
 
pipeline = Pipeline(stages=[string_indexer, one_hot_encoder, label_to_index, vec_assembler, lr])
pipeline_model = pipeline.fit(df_train)


In [0]:
df_pred = pipeline_model.transform(df_test)
display(df_pred)

In [0]:
df_pred.write.csv('predictions.csv')