From 78d4ef6ebe95ad5fac9644a517ed1fcfba541b13 Mon Sep 17 00:00:00 2001 From: faviovazquez Date: Thu, 21 Sep 2017 10:40:19 -0500 Subject: [PATCH] Fix testing for lookup and update doc for spark 2.0.x --- docs/index.rst | 12 ++++++------ optimus/df_transformer.py | 1 - optimus/utilities.py | 8 ++++---- tests/tests.py | 2 +- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 4749ca534..8d1c57493 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -619,7 +619,7 @@ dataFrame: population = [37800000,19795791,12341418,6489162] # Dataframe: - df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema) + df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema) # DataFrameTransformer Instanciation: transformer = op.DataFrameTransformer(df) @@ -1126,7 +1126,7 @@ Building a dummy dataFrame: population = [37800000,19795791,12341418,6489162] # Dataframe: - df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema) + df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema) df.show() @@ -1328,7 +1328,7 @@ Building a dummy dataFrame: population = [37800000,19795791,12341418,6489162] # Dataframe: - df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema) + df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema) df.show() @@ -1357,7 +1357,7 @@ New DF: transformer.show() # Capital letters: - transformer.lookup('city', ['Caracas', 'Ccs'], 'Caracas') + transformer.lookup('city', "Caracas", ['Caracas', 'Ccs']) # Printing new dataFrame: print('New dataFrame:') @@ -1475,7 +1475,7 @@ See the example bellow to more explanations: # Dataframe: - df = sqlContext.createDataFrame(list(zip(id_, foods)), schema=schema) + df = op.spark.createDataFrame(list(zip(id_, foods)), schema=schema) df.show() @@ -1590,7 +1590,7 @@ date_transform(self, column, current_format, output_format) population = [37800000,19795791,12341418,6489162] # Dataframe: - df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema) + df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema) df.show() diff --git a/optimus/df_transformer.py b/optimus/df_transformer.py index 455c462a0..e4eacd8b4 100644 --- a/optimus/df_transformer.py +++ b/optimus/df_transformer.py @@ -24,7 +24,6 @@ def __init__(self, df): # Dataframe self._df = df # SparkContext: - # self._sql_context = SQLContext(self._df.sql_ctx) self._sql_context = self._df.sql_ctx self._number_of_transformations = 0 diff --git a/optimus/utilities.py b/optimus/utilities.py index 4be2a5e1d..dddc64413 100644 --- a/optimus/utilities.py +++ b/optimus/utilities.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Importing os module for system operative utilities import os -# Importing SQLContext: +# Importing SparkSession: from pyspark.sql.session import SparkSession # Importing module to delete folders from shutil import rmtree @@ -19,7 +19,7 @@ class Utilities: def __init__(self): - # Setting SQLContext as a global variable of the class + # Setting spark as a global variable of the class self.spark = SparkSession.builder.enableHiveSupport().getOrCreate() # Setting SparkContent as a global variable of the class self.__sc = self.spark.sparkContext @@ -103,7 +103,7 @@ def json_load_spark_data_frame_from_url(self, data_url): def read_dataset_parquet(self, path): """This function allows user to read parquet files. It is import to clarify that this method is just based - on the sqlContext.read.parquet(path) Apache Spark method. Only assertion instructions has been added to + on the spark.read.parquet(path) Apache Spark method. Only assertion instructions has been added to ensure user has more hints about what happened when something goes wrong. :param path Path or location of the file. Must be string dataType. @@ -290,7 +290,7 @@ class Airtable: def __init__(self, path): # Setting airtable dataset variable self._air_table = None - # Setting SQLContext as a global variable of the class + # Setting spark as a global variable of the class self.spark = SparkSession() self.sc = self.spark.sparkContext # Reading dataset diff --git a/tests/tests.py b/tests/tests.py index 0a736fcfc..3afa42c07 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -149,7 +149,7 @@ def test_rename_col(spark_session): def test_lookup(spark_session): try: transformer = op.DataFrameTransformer(create_df(spark_session)) - transformer.lookup('city', ['Caracas', 'Ccs'], 'Caracas') + transformer.lookup('city', "Caracas", ['Caracas', 'Ccs']) assert_spark_df(transformer.get_data_frame) except RuntimeError: logger.exception('Could not run lookup().')