In [2]:
from pyspark.sql import DataFrame, Column, Row, SparkSession
from pyspark.sql import functions as F
from typing import Dict, List
from functools import reduce

In [3]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/29 11:55:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [54]:
def extractWords(df: DataFrame, words: List[str],
            columns: List[str], outputColumn: str) -> DataFrame:
    """
    Extract the list of `words` from `columns` from the table `df` and
    output them into `outputColumn`.
    """
    # Convert the list of `words` into a string separated with pipes.
    regexList = "|".join(words)
    # Add parenthesis to the regexList so that they turn into a group.
    regexPattern = '(' + regexList + ')'
    # Define a local function to concat the columns `columns`.
    def concatInterestColumns(columns: List[str]) -> Column:
        # Define a separator.
        separator = '|'
        # Map the columns into a list of F.col() objects.
        columnsSpark = list(map(F.col, columns))
        # Return the concatenated columns as a concatenated column.
        return F.concat_ws(separator, *columnsSpark)
    # Define a local function to extract the `regexPattern` from `column`.
    def regexExtract(column: Column) -> Column:
        return F.regexp_extract(column, regexPattern, 0)
    # Define a composition of functions to concatenate and extract from
    # the concatenation to prevent the usage of multiple `.withColumn`.
    def extractFromConcatenatedColumns(columns: List[str]) -> Column:
        return regexExtract(concatInterestColumns(columns))
    # Apply the previous concatenation and name it with `outputColumn`. 
    return df.withColumn(outputColumn, extractFromConcatenatedColumns(columns))


def standardizeWords(df: DataFrame, standardDict: Dict[str, List[str]],
                    column: str) -> DataFrame:
    """
    Standardize the values within the column `column` from the table
    `df`, according to the information from `standardDict`.
    """
    # Start by inverting the dictionary so that for each element of the
    # list, it will standardize to the key.
    invertedDict = {value: key for (key, values) in standardDict.items()
                               for value in values}
    # Start the standardization query
    standardizeQuery = "CASE "
    # Dynamically generate the standardize query according to the values
    # from the inverted dictionary `invertedDict`.
    for (key, value) in invertedDict.items():
        standardizeQuery += f"WHEN {column} RLIKE '{key}' THEN '{value}' "
    # Finalize the standardization query.
    standardizeQuery += f"ELSE {column} END"
    # Evaluate the dictionary with `F.expr` so that we only use once the
    # evaluation from `withColumn` and optimize the process.
    print(standardizeQuery)
    return df.withColumn(column, F.expr(standardizeQuery))


def extractAndStandardizeWords(df: DataFrame, columns: List[str],
                               outputColumn: str,
                               stdDict: Dict[str, List[str]]) -> DataFrame:
    """
    Standardize the words `words` which are extracted from the columns
    `columns` over the table `df` by using the information from the
    standardization dictionary `standardDict`.
    """
    # Generate the words from the values of the standardize Dictionary.
    words = reduce(lambda list1, list2: list1 + list2, stdDict.values())
    # First extract the words using the `extractWords` function.
    extractedWords = extractWords(df, words, columns, outputColumn)
    # Next standardize the words using the `standardizeWords` function.
    standardizedWords = standardizeWords(extractedWords, standardDict,
                                         outputColumn)
    # Return the object in which the words were extracted and standardized.
    return standardizedWords

In [55]:
def loadData(filepath: str) -> DataFrame:
    """
    Loads the data from `filepath` and process it with the options for
    this specific project. Return a Spark DataFrame object containing
    its data.

    Parameters:
        filepath: str
            This is the path to the CSV containing the data.
    """
    options = {
        "header": True,
        "inferSchema": True,
        "nullValues": None
    }
    return spark.read.options(**options).csv(filepath)


def processColumnsUpper(df: DataFrame, columns: List[str]) -> DataFrame:
    """
    Transform the columns `columns` from the table `df` to upper case. 
    """
    layout = df.columns
    def upperCaseColumn(layout: List[str], column: str) -> Column:
        return (F.upper(column).alias(column) if column in layout
                                              else F.col(column))
    def upperCaseCols(layout: List[str], columns: List[str]) -> List[Column]:
        return list(map(lambda column: upperCaseColumn(layout, column), columns))
    newLayout = upperCaseCols(layout, columns)
    return df.select(newLayout)

In [56]:
cols = ["rating", "description"]
colsSpark = list(map(F.col, cols))
dfUpper = processColumnsUpper(df, cols)

In [57]:
standardDict = {
    "TEST": ["(^|\W)NFL(\W|$)", "TEAMMATES", "MISSION"]
}
extractAndStandardizeWords(dfUpper, cols, "test", standardDict).filter(F.col("test") != "").show()

CASE WHEN test RLIKE '(^|\W)NFL(\W|$)' THEN 'TEST' WHEN test RLIKE 'TEAMMATES' THEN 'TEST' WHEN test RLIKE 'MISSION' THEN 'TEST' ELSE test END
+------+--------------------+-----+
|rating|         description| test|
+------+--------------------+-----+
|     R|NEW NFL STAR THAD...| NFL |
|     R|BAD NEWS FROM THE...| TEST|
| TV-MA|IN THIS DARK COME...| TEST|
| TV-MA|WHEN A PECULIAR H...| TEST|
| TV-14|FIVE FRIENDS EMBA...| TEST|
| PG-13|AN FBI AGENT MAKE...| TEST|
| TV-PG|THIS DOCUMENTARY ...| TEST|
|     R|AFTER FAKING HIS ...| TEST|
| TV-MA|STAND-UP COMEDIAN...| NFL |
| TV-14|A REBELLIOUS DAUG...| TEST|
| TV-MA|ARMED WITH MYSTER...| TEST|
| TV-MA|IN EACH EPISODE O...| TEST|
| TV-MA|WHEN A YOUNG GIRL...| TEST|
| TV-MA|FROM CRIPPLING PA...| TEST|
|  TV-Y|OH TAKES IT UPON ...| TEST|
|    PG|SHE’S NOT THE USU...| TEST|
| TV-14|DRIVEN BY THE LES...| TEST|
| TV-14|AFTER HER HUSBAND...| TEST|
|     R|A HARDENED MERCEN...| TEST|
| TV-PG|HÜSEYIN HAS FOUND...| TEST|
+------+--------------------+

In [37]:
a = {
    "a": ["1", "2"],
    "b": ["x", "y"]
}
reduce(lambda a, b: a + b, a.values())

['1', '2', 'x', 'y']

In [42]:
{val: key for (key, value) in a.items() for val in value}

{'1': 'a', '2': 'a', 'x': 'b', 'y': 'b'}