Exploratory Analysis Code

In [None]:
import pandas as pd  # tested with version 1.1.5
from tqdm import tqdm
import gc
pd.options.mode.chained_assignment = None


class ExploratoryAnalysis:
    """"
    ExploratoryAnalysis prints specific analysis which represent
     the answer to each of the first 4 questions of the Data
     Engineering Bootcamp.

    Parameters:
        filename : str
            The system path where the file to be analysed is located.

    Attributes:
        df
        filename
        chunk_size

    Methods:
        chains_cnt (answer to question 1)
        prod_by_state (answer 2)
        top_chain (answer 3)
        prices_per_range (answer 4)
    """
    def __init__(self, filename):
        self.df = pd.DataFrame()
        self.filename = filename
        # change chunk_size depending on memory available
        # 10 ** 7 works fine with 16 gb of RAM
        self.chunk_size = 10 ** 7

    # question 1 answer
    def chains_cnt(self):
        """
        Prints the total number of unique chains.

        Reads the 'cadenaComercial' column to obtain all the existing
         commercial chains without duplicates.

        Returns:
            None

        Typical usage example:
        analysis = ExploratoryAnalysis(file_name)
        analysis.chains_cnt()
        """
        self._read_columns('cadenaComercial')
        print(len(self.df.unique()))
        del self.df
        gc.collect()

    # question 2 answer
    def prod_by_state(self, top_products):
        """
        Prints the top n (by amount) products by state.

        Uses the state and product columns to group them together. Results
         in a dataframe with three columns: state, product and frequency.

        Args:
            top_products: int.
                Number of products to be printed per state.

        Returns:
            None

        Typical usage example:
        analysis = ExploratoryAnalysis(file_name)
        analysis.prod_by_state(10)
        """
        self._read_columns(['producto', 'estado'])

        # reset_index is used to convert a Series object (returned by .size())
        # to a DataFrame.
        # At the same time it keeps the state column as part of the dataframe
        # (which is the index in the Series)
        self.df = self.df.groupby(by=['estado', 'producto'], dropna=True).size()
        self.df = self.df.reset_index()

        # sort values in a descending fashion to get the top n products by state
        self.df.sort_values(by=['estado', 0],
                            ascending=[True, False],
                            inplace=True)
        self.df = self.df.groupby('estado').head(top_products)
        self.df.rename(columns={0: 'cantidad'}, inplace=True)
        self.df.reset_index(drop=True, inplace=True)

        print(self.df)
        del self.df
        gc.collect()

    # question 3 answer
    def top_chain(self):
        """
        Prints the chain with the most products.

        Reads the 'cadenaComercial' column to obtain the chain with the most
         products including the number of products for the chain.

        Returns:
            None

        Typical usage example:
        analysis = ExploratoryAnalysis(file_name)
        analysis.top_chain()
        """
        self._read_columns('cadenaComercial')
        self.df = self.df.value_counts().reset_index().iloc[0, :]
        self.df.rename({0: 'Numero de productos', 'index': 'CadenaComercial'},
                       inplace=True)
        print(self.df)
        del self.df
        gc.collect()

    def prices_per_day(self):
        """
            Prints the count of products for ten price ranges.

            Works with the 'precio' and 'fechaRegistro' columns only. Shows
             the max and min price per range along with the number of products
             in that price range for each day of the week.

            Returns:
                None

            Typical usage example:
            analysis = ExploratoryAnalysis(file_name)
            analysis.prices_per_day()

            Notes:
                We subtract 1e-6 to min_price only for the first price range since
                 we want the inequality (self.df.values > min_price) to be inclusive
                 only in the first step. Which is why we sum 1e-6 to max_price to
                 keep it inclusive.
        """
        self._read_float_columns(['fechaRegistro', 'precio'])

        days_of_week = {0: 'Monday', 1: 'Tuesday', 2: 'Wednesday',
                        3: 'Thursday', 4: 'Friday', 5: 'Saturday',
                        6: 'Sunday'}
        for day_idx in range(7):
            day = self.df['fechaRegistro'].dt.dayofweek.isin([day_idx])
            price = self.df[day]['precio']
            range_length = 10
            print(f'\nPrice Ranges for {days_of_week[day_idx]}\n')
            range_size = (price.max() - price.min()) / range_length
            for cnt, i in enumerate(range(range_length)):
                min_price = (price.min() + cnt * range_size)
                max_price = (min_price + 1e-6 + range_size)
                min_price = min_price if cnt > 0 else (min_price - 1e-6)
                print("Min: {:.2f}".format(min_price),
                      ' Max: {:.2f}'.format(max_price))

                cond = (price.values > min_price) & (price.values <= max_price)
                print(len(self.df[day][cond]))
        del self.df
        gc.collect()

    # -------------------------------------------------------------------------
    # Private methods

    def _read_columns(self, col):
        """
        Reads the file in chunks.

        Saves all the rows in the file to self.df for the given column or list
         of columns.

        Args:
            col: str or list.
                Columns to be read into the series or dataframe.
        Returns:
            None

        Typical usage example:

        self._read_columns(['cadenaComercial', 'producto'])
        """
        if type(col) == list and len(col) > 1:
            self.df = pd.DataFrame()
        else:
            self.df = pd.Series()

        read_iter = pd.read_csv(self.filename, chunksize=self.chunk_size)
        for chunk in tqdm(read_iter):
            # found invalid rows when reading in the file
            invalid_rows = chunk['cadenaComercial'] == 'cadenaComercial'
            chunk.drop(chunk[invalid_rows].index, inplace=True)
            chunk = chunk[col]
            chunk.dropna(inplace=True)
            self.df = pd.concat([self.df, chunk])

    def _read_float_columns(self, col):
        """
        Reads the file in chunks.

        Saves all the rows in the file to self.df for the given float
        column or list of columns.

        Args:
            col: str or list.
                Float columns to be read into the series or dataframe.

        Returns:
            None

        Typical usage example:

        self._read_float_columns('precio')

        Notes:
            Coerce from pd.to_numeric leaves invalid values as null which
            is why we have to drop those values after that line.
        """
        read_iter = pd.read_csv(self.filename, chunksize=self.chunk_size)
        for chunk in tqdm(read_iter):
            chunk = chunk[col]
            chunk['precio'] = pd.to_numeric(chunk['precio'], errors='coerce')
            chunk.dropna(inplace=True)
            self.df = pd.concat([self.df, chunk])

        self.df['fechaRegistro'] = pd.to_datetime(self.df['fechaRegistro'],
                                                  format='%Y-%m-%d %H:%M:%S.%f',
                                                  errors='coerce')
        self.df.dropna(inplace=True)


pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

file_name = 'C:/path_to_file/all_data.csv'
analysis = ExploratoryAnalysis(file_name)
analysis.chains_cnt()
analysis.prod_by_state(10)
analysis.top_chain()
analysis.prices_per_day()
